use std::any::{Any, TypeId};
use std::hash::Hash;
use nexus_notify::local::LocalNotify;
use nexus_notify::{Events, Token};
use rustc_hash::FxHashMap;
use crate::ctx_pipeline::CtxStepCall;
use crate::handler::Param;
use crate::world::{Registry, Resource, ResourceId, World};
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct DataSource(pub usize);
pub struct ReactorNotify {
notify: LocalNotify,
interests: slab::Slab<Vec<Token>>,
reactor_sources: Vec<Vec<DataSource>>,
reactors: slab::Slab<Option<Box<dyn Reactor>>>,
}
impl std::fmt::Debug for ReactorNotify {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReactorNotify")
.field("num_sources", &self.interests.len())
.field("num_reactors", &self.reactors.len())
.field("notify", &self.notify)
.finish()
}
}
impl ReactorNotify {
pub fn new(source_capacity: usize, reactor_capacity: usize) -> Self {
Self {
notify: LocalNotify::with_capacity(reactor_capacity),
interests: slab::Slab::with_capacity(source_capacity),
reactor_sources: Vec::with_capacity(reactor_capacity),
reactors: slab::Slab::with_capacity(reactor_capacity),
}
}
pub fn register_source(&mut self) -> DataSource {
DataSource(self.interests.insert(Vec::new()))
}
pub fn remove_source(&mut self, source: DataSource) {
if self.interests.contains(source.0) {
self.interests.remove(source.0);
}
}
pub fn create_reactor(&mut self) -> Token {
let key = self.reactors.insert(None);
self.notify.ensure_capacity(key);
if key >= self.reactor_sources.len() {
self.reactor_sources.resize_with(key + 1, Vec::new);
}
Token::new(key)
}
pub fn insert_reactor(
&mut self,
token: Token,
reactor: impl Reactor + 'static,
) -> ReactorRegistration<'_> {
let idx = token.index();
assert!(
self.reactors.contains(idx),
"token {} was not allocated by create_reactor",
idx,
);
assert!(
self.reactors[idx].is_none(),
"token {} was already filled",
idx,
);
self.reactors[idx] = Some(Box::new(reactor));
ReactorRegistration {
token,
notify: self,
}
}
pub fn register<C, Params, F: IntoReactor<C, Params>>(
&mut self,
ctx_fn: impl FnOnce(Token) -> C,
step: F,
registry: &Registry,
) -> ReactorRegistration<'_> {
let key = self.reactors.vacant_key();
let token = Token::new(key);
self.notify.ensure_capacity(key);
if key >= self.reactor_sources.len() {
self.reactor_sources.resize_with(key + 1, Vec::new);
}
let ctx = ctx_fn(token);
let reactor = step.into_reactor(ctx, registry);
let inserted = self.reactors.insert(Some(Box::new(reactor)));
debug_assert_eq!(inserted, key);
ReactorRegistration {
token,
notify: self,
}
}
pub fn register_built(&mut self, reactor: impl Reactor + 'static) -> ReactorRegistration<'_> {
let key = self.reactors.vacant_key();
let token = Token::new(key);
self.notify.ensure_capacity(key);
if key >= self.reactor_sources.len() {
self.reactor_sources.resize_with(key + 1, Vec::new);
}
let inserted = self.reactors.insert(Some(Box::new(reactor)));
debug_assert_eq!(inserted, key);
ReactorRegistration {
token,
notify: self,
}
}
pub fn subscribe(&mut self, reactor: Token, source: DataSource) {
if let Some(subscribers) = self.interests.get_mut(source.0) {
if !subscribers.contains(&reactor) {
subscribers.push(reactor);
let idx = reactor.index();
debug_assert!(
idx < self.reactor_sources.len(),
"reactor_sources missing entry for reactor token {}",
idx,
);
self.reactor_sources[idx].push(source);
}
}
}
pub fn unsubscribe(&mut self, reactor: Token, source: DataSource) {
if let Some(subscribers) = self.interests.get_mut(source.0) {
subscribers.retain(|&t| t != reactor);
}
if let Some(sources) = self.reactor_sources.get_mut(reactor.index()) {
sources.retain(|&s| s != source);
}
}
#[inline]
pub fn mark(&mut self, source: DataSource) {
if let Some(subscribers) = self.interests.get(source.0) {
for &reactor_token in subscribers {
self.notify.mark(reactor_token);
}
}
}
#[inline]
pub(crate) fn poll(&mut self, events: &mut Events) {
self.notify.poll(events);
}
#[inline]
pub(crate) fn take_reactor(&mut self, idx: usize) -> Option<Box<dyn Reactor>> {
self.reactors.get_mut(idx).and_then(Option::take)
}
#[inline]
pub(crate) fn put_reactor(&mut self, idx: usize, reactor: Box<dyn Reactor>) {
self.reactors[idx] = Some(reactor);
}
pub fn remove_reactor(&mut self, token: Token) {
let idx = token.index();
if self.reactors.contains(idx) {
self.reactors.remove(idx);
if let Some(sources) = self.reactor_sources.get_mut(idx) {
for &source in sources.iter() {
if let Some(subscribers) = self.interests.get_mut(source.0) {
subscribers.retain(|&t| t != token);
}
}
sources.clear();
}
}
}
pub fn has_notified(&self) -> bool {
self.notify.has_notified()
}
pub fn notified_count(&self) -> usize {
self.notify.notified_count()
}
pub fn source_count(&self) -> usize {
self.interests.len()
}
pub fn reactor_count(&self) -> usize {
self.reactors.len()
}
}
impl Resource for ReactorNotify {}
pub struct ReactorRegistration<'a> {
token: Token,
notify: &'a mut ReactorNotify,
}
impl ReactorRegistration<'_> {
pub fn subscribe(self, source: DataSource) -> Self {
self.notify.subscribe(self.token, source);
self
}
pub fn token(&self) -> Token {
self.token
}
}
pub trait Reactor: Send {
fn run(&mut self, world: &mut World);
fn name(&self) -> &'static str {
"<unnamed>"
}
}
pub struct ReactorFn<C, F, Params: Param> {
pub ctx: C,
f: F,
state: Params::State,
name: &'static str,
}
pub struct PipelineReactor<C> {
pub ctx: C,
body: Box<dyn CtxStepCall<C, (), Out = ()> + Send>,
}
impl<C: Send + 'static> PipelineReactor<C> {
pub fn new(ctx: C, body: impl CtxStepCall<C, (), Out = ()> + Send + 'static) -> Self {
Self {
ctx,
body: Box::new(body),
}
}
}
impl<C: Send + 'static> Reactor for PipelineReactor<C> {
fn run(&mut self, world: &mut World) {
self.body.call(&mut self.ctx, world, ());
}
fn name(&self) -> &'static str {
std::any::type_name::<C>()
}
}
#[diagnostic::on_unimplemented(
message = "this function cannot be used as a reactor step",
note = "reactor step signature: `fn(&mut C, Params...)` — context first, then resources",
note = "closures with resource parameters are not supported — use a named `fn`"
)]
pub trait IntoReactor<C, Params> {
type Reactor: Reactor + 'static;
fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor;
}
impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> Reactor for ReactorFn<C, F, ()> {
fn run(&mut self, _world: &mut World) {
(self.f)(&mut self.ctx);
}
fn name(&self) -> &'static str {
self.name
}
}
impl<C: Send + 'static, F: FnMut(&mut C) + Send + 'static> IntoReactor<C, ()> for F {
type Reactor = ReactorFn<C, F, ()>;
fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
ReactorFn {
ctx,
f: self,
state: <() as Param>::init(registry),
name: std::any::type_name::<F>(),
}
}
}
macro_rules! impl_into_reactor {
($($P:ident),+) => {
impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
Reactor for ReactorFn<C, F, ($($P,)+)>
where
for<'a> &'a mut F:
FnMut(&mut C, $($P,)+) +
FnMut(&mut C, $($P::Item<'a>,)+),
{
#[allow(non_snake_case)]
fn run(&mut self, world: &mut World) {
#[allow(clippy::too_many_arguments)]
fn call_inner<Ctx, $($P,)+>(
mut f: impl FnMut(&mut Ctx, $($P,)+),
ctx: &mut Ctx,
$($P: $P,)+
) {
f(ctx, $($P,)+);
}
#[cfg(debug_assertions)]
world.clear_borrows();
let ($($P,)+) = unsafe {
<($($P,)+) as Param>::fetch(world, &mut self.state)
};
call_inner(&mut self.f, &mut self.ctx, $($P,)+);
}
fn name(&self) -> &'static str {
self.name
}
}
impl<C: Send + 'static, F: Send + 'static, $($P: Param + 'static),+>
IntoReactor<C, ($($P,)+)> for F
where
for<'a> &'a mut F:
FnMut(&mut C, $($P,)+) +
FnMut(&mut C, $($P::Item<'a>,)+),
{
type Reactor = ReactorFn<C, F, ($($P,)+)>;
fn into_reactor(self, ctx: C, registry: &Registry) -> Self::Reactor {
let state = <($($P,)+) as Param>::init(registry);
{
#[allow(non_snake_case)]
let ($($P,)+) = &state;
registry.check_access(&[
$(
(<$P as Param>::resource_id($P),
std::any::type_name::<$P>()),
)+
]);
}
ReactorFn {
ctx,
f: self,
state,
name: std::any::type_name::<F>(),
}
}
}
};
}
all_tuples!(impl_into_reactor);
#[derive(Default)]
pub struct DeferredRemovals {
tokens: Vec<Token>,
}
impl DeferredRemovals {
pub fn deregister(&mut self, token: Token) {
self.tokens.push(token);
}
#[inline]
pub(crate) fn take(&mut self) -> Vec<Token> {
std::mem::take(&mut self.tokens)
}
#[inline]
pub(crate) fn put(&mut self, tokens: Vec<Token>) {
debug_assert!(tokens.is_empty(), "put() expects a drained Vec");
self.tokens = tokens;
}
pub fn is_empty(&self) -> bool {
self.tokens.is_empty()
}
}
impl Resource for DeferredRemovals {}
#[derive(Default)]
pub struct SourceRegistry {
maps: FxHashMap<TypeId, Box<dyn Any + Send>>,
}
impl SourceRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn insert<K: Hash + Eq + Send + 'static>(&mut self, key: K, source: DataSource) {
self.get_or_create_map::<K>().insert(key, source);
}
pub fn get<K: Hash + Eq + Send + 'static>(&self, key: &K) -> Option<DataSource> {
self.get_map::<K>().and_then(|map| map.get(key)).copied()
}
pub fn remove<K: Hash + Eq + Send + 'static>(&mut self, key: &K) -> Option<DataSource> {
self.get_map_mut::<K>().and_then(|map| map.remove(key))
}
pub fn contains<K: Hash + Eq + Send + 'static>(&self, key: &K) -> bool {
self.get_map::<K>().is_some_and(|map| map.contains_key(key))
}
fn get_map<K: Hash + Eq + Send + 'static>(&self) -> Option<&FxHashMap<K, DataSource>> {
self.maps.get(&TypeId::of::<K>()).map(|boxed| {
boxed
.downcast_ref::<FxHashMap<K, DataSource>>()
.expect("invariant: TypeId matches stored map type")
})
}
fn get_map_mut<K: Hash + Eq + Send + 'static>(
&mut self,
) -> Option<&mut FxHashMap<K, DataSource>> {
self.maps.get_mut(&TypeId::of::<K>()).map(|boxed| {
boxed
.downcast_mut::<FxHashMap<K, DataSource>>()
.expect("invariant: TypeId matches stored map type")
})
}
fn get_or_create_map<K: Hash + Eq + Send + 'static>(
&mut self,
) -> &mut FxHashMap<K, DataSource> {
self.maps
.entry(TypeId::of::<K>())
.or_insert_with(|| Box::<FxHashMap<K, DataSource>>::default())
.downcast_mut::<FxHashMap<K, DataSource>>()
.unwrap()
}
}
impl Resource for SourceRegistry {}
pub struct ReactorSystem {
events: Events,
notify_id: ResourceId,
removals_id: ResourceId,
}
impl ReactorSystem {
pub fn new(world: &World) -> Self {
Self {
events: Events::with_capacity(256),
notify_id: world.id::<ReactorNotify>(),
removals_id: world.id::<DeferredRemovals>(),
}
}
pub fn dispatch(&mut self, world: &mut World) -> bool {
let notify_ptr: *mut ReactorNotify =
unsafe { world.get_mut::<ReactorNotify>(self.notify_id) };
{
let notify = unsafe { &mut *notify_ptr };
notify.poll(&mut self.events);
}
let ran = !self.events.is_empty();
for token in self.events.iter() {
let idx = token.index();
let reactor = {
let notify = unsafe { &mut *notify_ptr };
notify.take_reactor(idx)
};
if let Some(mut reactor) = reactor {
reactor.run(world);
let notify = unsafe { &mut *notify_ptr };
notify.put_reactor(idx, reactor);
}
}
let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
let mut pending = removals.take();
if !pending.is_empty() {
let notify = unsafe { &mut *notify_ptr };
while let Some(token) = pending.pop() {
notify.remove_reactor(token);
}
}
let removals = unsafe { world.get_mut::<DeferredRemovals>(self.removals_id) };
removals.put(pending);
ran
}
pub fn reactor_count(&self, world: &World) -> usize {
world.resource::<ReactorNotify>().reactor_count()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Res, ResMut, WorldBuilder};
#[test]
fn reactor_fn_arity0() {
let wb = WorldBuilder::new();
let mut world = wb.build();
let reg = world.registry();
struct Ctx {
count: u32,
}
fn step(ctx: &mut Ctx) {
ctx.count += 1;
}
let mut reactor = step.into_reactor(Ctx { count: 0 }, reg);
reactor.run(&mut world);
assert_eq!(reactor.ctx.count, 1);
}
#[test]
fn reactor_fn_with_params() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(10);
wb.register::<u32>(0);
let mut world = wb.build();
let reg = world.registry();
struct Ctx {
multiplier: u64,
}
fn step(ctx: &mut Ctx, val: Res<u64>, mut out: ResMut<u32>) {
*out = (*val * ctx.multiplier) as u32;
}
let mut reactor = step.into_reactor(Ctx { multiplier: 5 }, reg);
reactor.run(&mut world);
assert_eq!(*world.resource::<u32>(), 50);
}
fn dummy_reactor() -> ReactorFn<(), fn(&mut ()), ()> {
ReactorFn {
ctx: (),
f: (|_: &mut ()| {}) as fn(&mut ()),
state: (),
name: "dummy",
}
}
#[test]
fn reactor_notify_mark_fans_out() {
let mut notify = ReactorNotify::new(4, 8);
let mut events = Events::with_capacity(8);
let src = notify.register_source();
let a1 = notify.register_built(dummy_reactor()).token();
let a2 = notify.register_built(dummy_reactor()).token();
let _a3 = notify.register_built(dummy_reactor()).token();
notify.subscribe(a1, src);
notify.subscribe(a2, src);
notify.mark(src);
notify.notify.poll(&mut events);
assert_eq!(events.len(), 2);
assert!(events.as_slice().contains(&a1));
assert!(events.as_slice().contains(&a2));
}
#[test]
fn reactor_notify_dedup_across_sources() {
let mut notify = ReactorNotify::new(4, 8);
let mut events = Events::with_capacity(8);
let src1 = notify.register_source();
let src2 = notify.register_source();
let reactor = notify.register_built(dummy_reactor()).token();
notify.subscribe(reactor, src1);
notify.subscribe(reactor, src2);
notify.mark(src1);
notify.mark(src2);
notify.notify.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.as_slice()[0], reactor);
}
#[test]
fn reactor_notify_remove_reactor() {
let mut notify = ReactorNotify::new(4, 8);
let mut events = Events::with_capacity(8);
let src = notify.register_source();
struct Ctx;
let token = notify
.register_built(ReactorFn {
ctx: Ctx,
f: (|_: &mut Ctx| {}) as fn(&mut Ctx),
state: (),
name: "test",
})
.token();
notify.subscribe(token, src);
notify.remove_reactor(token);
notify.mark(src);
notify.notify.poll(&mut events);
assert!(events.is_empty());
}
fn notify_mut(world: &World, id: ResourceId) -> &mut ReactorNotify {
unsafe { world.get_mut::<ReactorNotify>(id) }
}
#[test]
fn reactor_system_dispatch() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let reg = world.registry();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
increment: u64,
}
fn step(ctx: &mut Ctx, mut val: ResMut<u64>) {
*val += ctx.increment;
}
let notify = notify_mut(&world, nid);
let src = notify.register_source();
notify
.register(
|t| Ctx {
_reactor_id: t,
increment: 10,
},
step,
reg,
)
.subscribe(src);
notify
.register(
|t| Ctx {
_reactor_id: t,
increment: 5,
},
step,
reg,
)
.subscribe(src);
notify_mut(&world, nid).mark(src);
let ran = system.dispatch(&mut world);
assert!(ran);
assert_eq!(*world.resource::<u64>(), 15); }
#[test]
fn reactor_system_deferred_removal() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let reg = world.registry();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
reactor_id: Token,
runs: u64,
}
fn step(ctx: &mut Ctx, mut val: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
*val += 1;
ctx.runs += 1;
if ctx.runs >= 2 {
removals.deregister(ctx.reactor_id);
}
}
let notify = notify_mut(&world, nid);
let src = notify.register_source();
notify
.register(
|t| Ctx {
reactor_id: t,
runs: 0,
},
step,
reg,
)
.subscribe(src);
assert_eq!(system.reactor_count(&world), 1);
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 1);
assert_eq!(system.reactor_count(&world), 1);
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 2);
assert_eq!(system.reactor_count(&world), 0);
notify_mut(&world, nid).mark(src);
let ran = system.dispatch(&mut world);
assert!(!ran);
assert_eq!(*world.resource::<u64>(), 2);
}
#[test]
fn reactor_system_only_subscribed_wake() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let reg = world.registry();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
value: u64,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += ctx.value;
}
let notify = notify_mut(&world, nid);
let btc = notify.register_source();
let eth = notify.register_source();
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 10,
},
step,
reg,
)
.subscribe(btc);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 100,
},
step,
reg,
)
.subscribe(eth);
notify_mut(&world, nid).mark(btc);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 10);
notify_mut(&world, nid).mark(eth);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 110);
}
#[test]
fn runtime_registration() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
value: u64,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += ctx.value;
}
let src = {
let reg = world.registry();
let notify = notify_mut(&world, nid);
let src = notify.register_source();
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 10,
},
step,
reg,
)
.subscribe(src);
src
};
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 10);
{
let reg = world.registry();
notify_mut(&world, nid)
.register(
|t| Ctx {
_reactor_id: t,
value: 100,
},
step,
reg,
)
.subscribe(src);
}
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 120); }
#[test]
fn register_after_remove_reuses_key() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
reactor_id: Token,
value: u64,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
*out += ctx.value;
if ctx.value == 10 {
removals.deregister(ctx.reactor_id);
}
}
let src = {
let reg = world.registry();
let notify = notify_mut(&world, nid);
let src = notify.register_source();
notify
.register(
|t| Ctx {
reactor_id: t,
value: 10,
},
step,
reg,
)
.subscribe(src);
src
};
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 10);
assert_eq!(system.reactor_count(&world), 0);
{
let reg = world.registry();
let notify = notify_mut(&world, nid);
let token = notify
.register(
|t| Ctx {
reactor_id: t,
value: 100,
},
step,
reg,
)
.token();
notify.subscribe(token, src);
assert_eq!(token.index(), 0); }
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 110); assert_eq!(system.reactor_count(&world), 1); }
#[test]
fn reactor_can_access_actor_notify() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
}
fn step(_ctx: &mut Ctx, notify: ResMut<ReactorNotify>, mut out: ResMut<u64>) {
*out = notify.reactor_count() as u64;
}
let src = {
let reg = world.registry();
let notify = notify_mut(&world, nid);
let src = notify.register_source();
notify
.register(|t| Ctx { _reactor_id: t }, step, reg)
.subscribe(src);
src
};
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
}
#[test]
fn multi_instrument_with_shared_source() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(8, 16));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
instrument: &'static str,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += match ctx.instrument {
"BTC" => 100,
"ETH" => 10,
"SOL" => 1,
_ => 0,
};
}
let (btc_md, eth_md, sol_md, positions) = {
let reg = world.registry();
let notify = notify_mut(&world, nid);
let btc_md = notify.register_source();
let eth_md = notify.register_source();
let sol_md = notify.register_source();
let positions = notify.register_source();
notify
.register(
|t| Ctx {
_reactor_id: t,
instrument: "BTC",
},
step,
reg,
)
.subscribe(btc_md)
.subscribe(positions);
notify
.register(
|t| Ctx {
_reactor_id: t,
instrument: "ETH",
},
step,
reg,
)
.subscribe(eth_md)
.subscribe(positions);
notify
.register(
|t| Ctx {
_reactor_id: t,
instrument: "SOL",
},
step,
reg,
)
.subscribe(sol_md)
.subscribe(positions);
(btc_md, eth_md, sol_md, positions)
};
notify_mut(&world, nid).mark(btc_md);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 100);
notify_mut(&world, nid).mark(positions);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 211);
notify_mut(&world, nid).mark(btc_md);
notify_mut(&world, nid).mark(eth_md);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 321);
notify_mut(&world, nid).mark(btc_md);
notify_mut(&world, nid).mark(positions);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 432);
let ran = system.dispatch(&mut world);
assert!(!ran);
assert_eq!(*world.resource::<u64>(), 432);
notify_mut(&world, nid).mark(sol_md);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 433);
}
#[test]
fn per_reactor_fill_routing() {
use std::collections::HashMap;
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(8, 16));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
reactor_id: Token,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += ctx.reactor_id.index() as u64 + 1;
}
let mut fill_sources: HashMap<usize, DataSource> = HashMap::new();
{
let reg = world.registry();
let notify = notify_mut(&world, nid);
for _ in 0..3 {
let fill_src = notify.register_source();
let token = notify
.register(|t| Ctx { reactor_id: t }, step, reg)
.subscribe(fill_src)
.token();
fill_sources.insert(token.index(), fill_src);
}
}
let wire_client_id: usize = 1;
let fill_source = fill_sources[&wire_client_id];
notify_mut(&world, nid).mark(fill_source);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 2);
let fill_source = fill_sources[&0];
notify_mut(&world, nid).mark(fill_source);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 3);
}
#[test]
fn dynamic_source_registration() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
value: u64,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += ctx.value;
}
let btc_md = {
let reg = world.registry();
let notify = notify_mut(&world, nid);
let btc_md = notify.register_source();
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 10,
},
step,
reg,
)
.subscribe(btc_md);
btc_md
};
notify_mut(&world, nid).mark(btc_md);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 10);
let eth_md = {
let reg = world.registry();
let notify = notify_mut(&world, nid);
let eth_md = notify.register_source();
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 100,
},
step,
reg,
)
.subscribe(eth_md);
eth_md
};
notify_mut(&world, nid).mark(btc_md);
notify_mut(&world, nid).mark(eth_md);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 120); }
#[test]
fn remove_source_and_reuse_slot() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
value: u64,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += ctx.value;
}
let (src_a, src_b) = {
let reg = world.registry();
let notify = notify_mut(&world, nid);
let src_a = notify.register_source();
let src_b = notify.register_source();
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 10,
},
step,
reg,
)
.subscribe(src_a);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 100,
},
step,
reg,
)
.subscribe(src_b);
(src_a, src_b)
};
notify_mut(&world, nid).remove_source(src_a);
notify_mut(&world, nid).mark(src_a);
let ran = system.dispatch(&mut world);
assert!(!ran);
notify_mut(&world, nid).mark(src_b);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 100);
let src_c = notify_mut(&world, nid).register_source();
assert_eq!(src_c.0, src_a.0);
let reg = world.registry();
let notify = notify_mut(&world, nid);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 1,
},
step,
reg,
)
.subscribe(src_c);
notify_mut(&world, nid).mark(src_c);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 101); }
#[test]
fn source_registry_basic() {
let mut registry = SourceRegistry::new();
#[derive(Hash, Eq, PartialEq, Debug)]
struct InstrumentId(u32);
let src_a = DataSource(0);
let src_b = DataSource(1);
registry.insert(InstrumentId(1), src_a);
registry.insert(InstrumentId(2), src_b);
assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
assert_eq!(registry.get(&InstrumentId(2)), Some(src_b));
assert_eq!(registry.get(&InstrumentId(3)), None);
assert!(registry.contains(&InstrumentId(1)));
assert!(!registry.contains(&InstrumentId(3)));
}
#[test]
fn source_registry_multiple_key_types() {
let mut registry = SourceRegistry::new();
#[derive(Hash, Eq, PartialEq)]
struct InstrumentId(u32);
#[derive(Hash, Eq, PartialEq)]
struct StrategyId(u32);
let src_a = DataSource(0);
let src_b = DataSource(1);
registry.insert(InstrumentId(1), src_a);
registry.insert(StrategyId(1), src_b);
assert_eq!(registry.get(&InstrumentId(1)), Some(src_a));
assert_eq!(registry.get(&StrategyId(1)), Some(src_b));
}
#[test]
fn source_registry_tuple_keys() {
let mut registry = SourceRegistry::new();
let src = DataSource(42);
registry.insert(("BTC", "Binance"), src);
assert_eq!(registry.get(&("BTC", "Binance")), Some(src));
assert_eq!(registry.get(&("ETH", "Binance")), None);
}
#[test]
fn source_registry_remove() {
let mut registry = SourceRegistry::new();
let src = DataSource(0);
registry.insert(42u64, src);
assert_eq!(registry.remove(&42u64), Some(src));
assert_eq!(registry.get(&42u64), None);
assert_eq!(registry.remove(&42u64), None); }
#[test]
fn source_registry_integrated_with_reactor_system() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
wb.register(SourceRegistry::new());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
#[derive(Hash, Eq, PartialEq, Clone, Copy)]
struct Instrument(u32);
const BTC: Instrument = Instrument(0);
const ETH: Instrument = Instrument(1);
struct Ctx {
_reactor_id: Token,
value: u64,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += ctx.value;
}
let btc_src = notify_mut(&world, nid).register_source();
let eth_src = notify_mut(&world, nid).register_source();
world.resource_mut::<SourceRegistry>().insert(BTC, btc_src);
world.resource_mut::<SourceRegistry>().insert(ETH, eth_src);
{
let reg = world.registry();
let btc = world.resource::<SourceRegistry>().get(&BTC).unwrap();
let notify = notify_mut(&world, nid);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 10,
},
step,
reg,
)
.subscribe(btc);
}
notify_mut(&world, nid).mark(btc_src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 10);
let removed = world.resource_mut::<SourceRegistry>().remove(&BTC);
assert!(removed.is_some());
notify_mut(&world, nid).remove_source(removed.unwrap());
notify_mut(&world, nid).mark(btc_src);
let ran = system.dispatch(&mut world);
assert!(!ran);
}
#[test]
fn source_registry_overwrite_key() {
let mut registry = SourceRegistry::new();
let src_a = DataSource(0);
let src_b = DataSource(1);
registry.insert(42u32, src_a);
assert_eq!(registry.get(&42u32), Some(src_a));
registry.insert(42u32, src_b);
assert_eq!(registry.get(&42u32), Some(src_b));
}
#[test]
fn source_registry_empty_get() {
let registry = SourceRegistry::new();
assert_eq!(registry.get(&42u32), None);
assert!(!registry.contains(&42u32));
}
#[test]
fn source_registry_enum_keys() {
#[derive(Hash, Eq, PartialEq)]
enum Venue {
Binance,
Coinbase,
}
let mut registry = SourceRegistry::new();
let src = DataSource(0);
registry.insert(Venue::Binance, src);
assert_eq!(registry.get(&Venue::Binance), Some(src));
assert_eq!(registry.get(&Venue::Coinbase), None);
}
#[test]
fn source_registry_composite_key() {
#[derive(Hash, Eq, PartialEq)]
struct StrategyId(u32);
#[derive(Hash, Eq, PartialEq)]
struct InstrumentId(u32);
#[derive(Hash, Eq, PartialEq)]
struct VenueId(u32);
let mut registry = SourceRegistry::new();
let src = DataSource(5);
registry.insert((StrategyId(1), InstrumentId(0), VenueId(2)), src);
assert_eq!(
registry.get(&(StrategyId(1), InstrumentId(0), VenueId(2))),
Some(src)
);
assert_eq!(
registry.get(&(StrategyId(2), InstrumentId(0), VenueId(2))),
None
);
}
#[test]
fn full_lifecycle_add_trade_remove() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
wb.register(SourceRegistry::new());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
#[derive(Hash, Eq, PartialEq, Clone, Copy)]
struct Instrument(u32);
struct Ctx {
_reactor_id: Token,
value: u64,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += ctx.value;
}
let btc_src = notify_mut(&world, nid).register_source();
world
.resource_mut::<SourceRegistry>()
.insert(Instrument(0), btc_src);
{
let reg = world.registry();
let notify = notify_mut(&world, nid);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 10,
},
step,
reg,
)
.subscribe(btc_src);
}
notify_mut(&world, nid).mark(btc_src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 10);
let eth_src = notify_mut(&world, nid).register_source();
world
.resource_mut::<SourceRegistry>()
.insert(Instrument(1), eth_src);
{
let reg = world.registry();
let notify = notify_mut(&world, nid);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 100,
},
step,
reg,
)
.subscribe(eth_src);
}
notify_mut(&world, nid).mark(btc_src);
notify_mut(&world, nid).mark(eth_src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 120);
let removed = world
.resource_mut::<SourceRegistry>()
.remove(&Instrument(0));
notify_mut(&world, nid).remove_source(removed.unwrap());
notify_mut(&world, nid).mark(eth_src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 220);
let sol_src = notify_mut(&world, nid).register_source();
world
.resource_mut::<SourceRegistry>()
.insert(Instrument(2), sol_src);
assert_eq!(sol_src.0, btc_src.0);
{
let reg = world.registry();
let notify = notify_mut(&world, nid);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 1000,
},
step,
reg,
)
.subscribe(sol_src);
}
notify_mut(&world, nid).mark(sol_src);
notify_mut(&world, nid).mark(eth_src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 1320); }
#[test]
fn multi_strategy_same_instrument() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(8, 16));
wb.register(DeferredRemovals::default());
wb.register(SourceRegistry::new());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
#[derive(Hash, Eq, PartialEq, Clone, Copy)]
struct StrategyInstrument(&'static str, &'static str);
struct Ctx {
_reactor_id: Token,
value: u64,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += ctx.value;
}
let reg = world.registry();
let notify = notify_mut(&world, nid);
let mm_btc = notify.register_source();
let mm_eth = notify.register_source();
let arb_btc = notify.register_source();
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 1,
},
step,
reg,
)
.subscribe(mm_btc);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 2,
},
step,
reg,
)
.subscribe(mm_eth);
notify
.register(
|t| Ctx {
_reactor_id: t,
value: 100,
},
step,
reg,
)
.subscribe(arb_btc);
world
.resource_mut::<SourceRegistry>()
.insert(StrategyInstrument("MM", "BTC"), mm_btc);
world
.resource_mut::<SourceRegistry>()
.insert(StrategyInstrument("MM", "ETH"), mm_eth);
world
.resource_mut::<SourceRegistry>()
.insert(StrategyInstrument("ARB", "BTC"), arb_btc);
let mm_btc_src = world
.resource::<SourceRegistry>()
.get(&StrategyInstrument("MM", "BTC"))
.unwrap();
let arb_btc_src = world
.resource::<SourceRegistry>()
.get(&StrategyInstrument("ARB", "BTC"))
.unwrap();
notify_mut(&world, nid).mark(mm_btc_src);
notify_mut(&world, nid).mark(arb_btc_src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 101); }
#[test]
fn reactor_self_removal_with_registry_cleanup() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
wb.register(SourceRegistry::new());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
reactor_id: Token,
}
fn one_shot(ctx: &mut Ctx, mut out: ResMut<u64>, mut removals: ResMut<DeferredRemovals>) {
*out += 1;
removals.deregister(ctx.reactor_id);
}
let src = notify_mut(&world, nid).register_source();
world
.resource_mut::<SourceRegistry>()
.insert("one-shot", src);
{
let reg = world.registry();
let notify = notify_mut(&world, nid);
notify
.register(|t| Ctx { reactor_id: t }, one_shot, reg)
.subscribe(src);
}
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 1);
assert_eq!(system.reactor_count(&world), 0);
assert!(world.resource::<SourceRegistry>().contains(&"one-shot"));
notify_mut(&world, nid).mark(src);
let ran = system.dispatch(&mut world);
assert!(!ran);
}
#[test]
fn many_reactors_same_source() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 64));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
}
fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += 1;
}
let src = notify_mut(&world, nid).register_source();
{
let reg = world.registry();
let notify = notify_mut(&world, nid);
for _ in 0..50 {
notify
.register(|t| Ctx { _reactor_id: t }, step, reg)
.subscribe(src);
}
}
assert_eq!(system.reactor_count(&world), 50);
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 50); }
#[test]
fn reactor_subscribes_to_multiple_sources() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(8, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
}
fn step(_ctx: &mut Ctx, mut out: ResMut<u64>) {
*out += 1;
}
let mut sources = Vec::new();
let notify = notify_mut(&world, nid);
for _ in 0..5 {
sources.push(notify.register_source());
}
{
let reg = world.registry();
let notify = notify_mut(&world, nid);
let mut registration = notify.register(|t| Ctx { _reactor_id: t }, step, reg);
for &src in &sources {
registration = registration.subscribe(src);
}
}
for &src in &sources {
notify_mut(&world, nid).mark(src);
}
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 1); }
#[test]
fn stale_data_source_is_noop() {
let mut wb = WorldBuilder::new();
wb.register(ReactorNotify::new(4, 4));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
let src = notify_mut(&world, nid).register_source();
notify_mut(&world, nid).remove_source(src);
notify_mut(&world, nid).mark(src);
let ran = system.dispatch(&mut world);
assert!(!ran);
}
#[test]
fn double_remove_source_is_noop() {
let mut notify = ReactorNotify::new(4, 4);
let src = notify.register_source();
notify.remove_source(src);
notify.remove_source(src); }
#[test]
fn pipeline_reactor_dispatch() {
use crate::CtxPipelineBuilder;
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
instrument: &'static str,
}
fn read_data(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
let _ = ctx.instrument;
*val
}
fn double(_ctx: &mut Ctx, x: u64) -> u64 {
x * 2
}
fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
*out = x;
}
let reg = world.registry();
let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
.then(read_data, reg)
.then(double, reg)
.then(store, reg)
.build();
let notify = notify_mut(&world, nid);
let src = notify.register_source();
let reactor = PipelineReactor::new(
Ctx {
_reactor_id: Token::new(0),
instrument: "BTC",
},
pipeline,
);
notify.register_built(reactor).subscribe(src);
*world.resource_mut::<u64>() = 10;
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 20); }
#[test]
fn dag_reactor_dispatch() {
use crate::CtxDagBuilder;
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
}
fn root(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
let _ = ctx;
*val
}
fn arm_double(_ctx: &mut Ctx, val: &u64) -> u64 {
*val * 2
}
fn arm_add(_ctx: &mut Ctx, val: &u64) -> u64 {
*val + 10
}
fn merge(_ctx: &mut Ctx, mut out: ResMut<u64>, a: &u64, b: &u64) {
*out = *a + *b;
}
let reg = world.registry();
let dag = CtxDagBuilder::<Ctx, ()>::new()
.root(root, reg)
.fork()
.arm(|seed| seed.then(arm_double, reg))
.arm(|seed| seed.then(arm_add, reg))
.merge(merge, reg)
.build();
let notify = notify_mut(&world, nid);
let src = notify.register_source();
let reactor = PipelineReactor::new(
Ctx {
_reactor_id: Token::new(0),
},
dag,
);
notify.register_built(reactor).subscribe(src);
*world.resource_mut::<u64>() = 5;
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 25);
}
#[test]
fn multiple_pipeline_reactors_different_bodies() {
use crate::CtxPipelineBuilder;
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
factor: u64,
}
fn multiply(ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
*val * ctx.factor
}
fn accumulate(_ctx: &mut Ctx, mut out: ResMut<u64>, val: u64) {
*out += val;
}
let reg = world.registry();
let pipeline_a = CtxPipelineBuilder::<Ctx, ()>::new()
.then(multiply, reg)
.then(accumulate, reg)
.build();
let pipeline_b = CtxPipelineBuilder::<Ctx, ()>::new()
.then(multiply, reg)
.then(accumulate, reg)
.build();
let notify = notify_mut(&world, nid);
let src = notify.register_source();
notify
.register_built(PipelineReactor::new(
Ctx {
_reactor_id: Token::new(0),
factor: 2,
},
pipeline_a,
))
.subscribe(src);
notify
.register_built(PipelineReactor::new(
Ctx {
_reactor_id: Token::new(1),
factor: 10,
},
pipeline_b,
))
.subscribe(src);
*world.resource_mut::<u64>() = 5;
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
let val = *world.resource::<u64>();
assert!(val > 5, "both reactors should have run, got {val}");
}
#[test]
fn pipeline_reactor_with_guard() {
use crate::CtxPipelineBuilder;
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let nid = world.id::<ReactorNotify>();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
threshold: u64,
}
fn read(_ctx: &mut Ctx, val: Res<u64>, _input: ()) -> u64 {
*val
}
fn above_threshold(ctx: &mut Ctx, val: &u64) -> bool {
*val > ctx.threshold
}
fn write(_ctx: &mut Ctx, mut out: ResMut<u64>, _val: u64) {
*out = 999;
}
let reg = world.registry();
let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
.then(read, reg)
.guard(above_threshold, reg)
.map(write, reg)
.build();
let notify = notify_mut(&world, nid);
let src = notify.register_source();
notify
.register_built(PipelineReactor::new(
Ctx {
_reactor_id: Token::new(0),
threshold: 10,
},
pipeline,
))
.subscribe(src);
*world.resource_mut::<u64>() = 5;
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 5);
*world.resource_mut::<u64>() = 20;
notify_mut(&world, nid).mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 999);
}
#[test]
fn two_phase_registration_safe_api() {
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let mut system = ReactorSystem::new(&world);
struct Ctx {
reactor_id: Token,
instrument: &'static str,
}
fn step(ctx: &mut Ctx, mut out: ResMut<u64>) {
let _ = ctx.instrument;
*out += ctx.reactor_id.index() as u64 + 1;
}
let src = world.resource_mut::<ReactorNotify>().register_source();
let token = world.resource_mut::<ReactorNotify>().create_reactor();
let reactor = step.into_reactor(
Ctx {
reactor_id: token,
instrument: "BTC",
},
world.registry(),
);
world
.resource_mut::<ReactorNotify>()
.insert_reactor(token, reactor)
.subscribe(src);
world.resource_mut::<ReactorNotify>().mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 1);
let token2 = world.resource_mut::<ReactorNotify>().create_reactor();
let actor2 = step.into_reactor(
Ctx {
reactor_id: token2,
instrument: "ETH",
},
world.registry(),
);
world
.resource_mut::<ReactorNotify>()
.insert_reactor(token2, actor2)
.subscribe(src);
world.resource_mut::<ReactorNotify>().mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 4); }
#[test]
fn two_phase_with_pipeline_reactor() {
use crate::CtxPipelineBuilder;
let mut wb = WorldBuilder::new();
wb.register::<u64>(0);
wb.register(ReactorNotify::new(4, 8));
wb.register(DeferredRemovals::default());
let mut world = wb.build();
let mut system = ReactorSystem::new(&world);
struct Ctx {
_reactor_id: Token,
}
fn read(ctx: &mut Ctx, val: Res<u64>, _: ()) -> u64 {
let _ = ctx;
*val
}
fn double(_ctx: &mut Ctx, x: u64) -> u64 {
x * 2
}
fn store(_ctx: &mut Ctx, mut out: ResMut<u64>, x: u64) {
*out = x;
}
let src = world.resource_mut::<ReactorNotify>().register_source();
let token = world.resource_mut::<ReactorNotify>().create_reactor();
let reg = world.registry();
let pipeline = CtxPipelineBuilder::<Ctx, ()>::new()
.then(read, reg)
.then(double, reg)
.then(store, reg)
.build();
let reactor = PipelineReactor::new(Ctx { _reactor_id: token }, pipeline);
world
.resource_mut::<ReactorNotify>()
.insert_reactor(token, reactor)
.subscribe(src);
*world.resource_mut::<u64>() = 10;
world.resource_mut::<ReactorNotify>().mark(src);
system.dispatch(&mut world);
assert_eq!(*world.resource::<u64>(), 20); }
}