use std::collections::{HashMap, HashSet, VecDeque};
use crate::ecs::commands::Commands;
use crate::ecs::component::{ComponentId, ComponentIdRegistry};
use crate::ecs::processor::{Processor, ProcessorContext};
use crate::ecs::universe::Universe;
pub enum StageNode {
Processor(Box<dyn Processor>),
Group(ProcessorGroup),
}
pub struct ProcessorGroup {
name: String,
children: Vec<StageNode>,
}
impl ProcessorGroup {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
children: Vec::new(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn add_processor(&mut self, processor: impl Processor) {
self.children.push(StageNode::Processor(Box::new(processor)));
}
pub fn add_group(&mut self, name: impl Into<String>, f: impl FnOnce(&mut ProcessorGroup)) {
let mut group = ProcessorGroup::new(name);
f(&mut group);
self.children.push(StageNode::Group(group));
}
}
pub struct Stage {
name: String,
children: Vec<StageNode>,
}
impl Stage {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
children: Vec::new(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn add_processor(&mut self, processor: impl Processor) {
self.children.push(StageNode::Processor(Box::new(processor)));
}
pub fn add_group(&mut self, name: impl Into<String>, f: impl FnOnce(&mut ProcessorGroup)) {
let mut group = ProcessorGroup::new(name);
f(&mut group);
self.children.push(StageNode::Group(group));
}
}
pub struct Schedule {
stages: Vec<Stage>,
}
impl Schedule {
pub fn new() -> Self {
Self {
stages: vec![Stage::new("Update")],
}
}
pub fn add_stage(&mut self, name: &str) -> &mut Self {
if !self.stages.iter().any(|s| s.name == name) {
self.stages.push(Stage::new(name));
}
self
}
pub fn add_processor_to_stage(
&mut self,
stage_name: &str,
processor: impl Processor,
) -> &mut Self {
let stage = self
.stages
.iter_mut()
.find(|s| s.name == stage_name)
.unwrap_or_else(|| panic!("stage '{}' not found", stage_name));
stage.add_processor(processor);
self
}
pub fn add_processor(&mut self, processor: impl Processor) -> &mut Self {
self.add_processor_to_stage("Update", processor)
}
pub fn add_group_to_stage(
&mut self,
stage_name: &str,
group_name: impl Into<String>,
f: impl FnOnce(&mut ProcessorGroup),
) -> &mut Self {
let stage = self
.stages
.iter_mut()
.find(|s| s.name == stage_name)
.unwrap_or_else(|| panic!("stage '{}' not found", stage_name));
stage.add_group(group_name, f);
self
}
pub fn add_group(&mut self, group_name: impl Into<String>, f: impl FnOnce(&mut ProcessorGroup)) -> &mut Self {
self.add_group_to_stage("Update", group_name, f)
}
pub fn run(&mut self, universe: &mut Universe) {
for stage in &mut self.stages {
run_stage(stage, universe);
}
}
}
impl Default for Schedule {
fn default() -> Self {
Self::new()
}
}
fn flatten_nodes_to_ptrs(nodes: &mut [StageNode], out: &mut Vec<*mut Box<dyn Processor>>) {
for node in nodes {
match node {
StageNode::Processor(p) => out.push(p as *mut Box<dyn Processor>),
StageNode::Group(g) => flatten_nodes_to_ptrs(&mut g.children, out),
}
}
}
struct ProcessorAccess {
reads: HashSet<ComponentId>,
writes: HashSet<ComponentId>,
}
fn gather_access(
processor: &dyn Processor,
registry: &ComponentIdRegistry,
) -> ProcessorAccess {
ProcessorAccess {
reads: processor.reads(registry).into_iter().collect(),
writes: processor.writes(registry).into_iter().collect(),
}
}
fn conflicts(a: &ProcessorAccess, b: &ProcessorAccess) -> bool {
for w in &a.writes {
if b.reads.contains(w) || b.writes.contains(w) {
return true;
}
}
for w in &b.writes {
if a.reads.contains(w) || a.writes.contains(w) {
return true;
}
}
false
}
fn topological_sort(processors: &[&dyn Processor]) -> Vec<usize> {
let n = processors.len();
if n == 0 {
return Vec::new();
}
let name_to_idx: HashMap<&str, usize> = processors
.iter()
.enumerate()
.filter(|(_, p)| !p.processor_name().is_empty())
.map(|(i, p)| (p.processor_name(), i))
.collect();
let mut edges: Vec<HashSet<usize>> = vec![HashSet::new(); n];
let mut in_degree: Vec<usize> = vec![0; n];
for (i, proc) in processors.iter().enumerate() {
for dep in proc.execute_after() {
if let Some(&j) = name_to_idx.get(dep) {
if edges[j].insert(i) {
in_degree[i] += 1;
}
}
}
for dep in proc.execute_before() {
if let Some(&j) = name_to_idx.get(dep) {
if edges[i].insert(j) {
in_degree[j] += 1;
}
}
}
}
let mut queue: VecDeque<usize> = (0..n).filter(|&i| in_degree[i] == 0).collect();
let mut sorted = Vec::with_capacity(n);
while let Some(i) = queue.pop_front() {
sorted.push(i);
for &j in &edges[i] {
in_degree[j] -= 1;
if in_degree[j] == 0 {
queue.push_back(j);
}
}
}
if sorted.len() != n {
let cycle_members: Vec<&str> = (0..n)
.filter(|&i| in_degree[i] > 0)
.map(|i| {
let name = processors[i].processor_name();
if name.is_empty() { "<unnamed>" } else { name }
})
.collect();
panic!(
"cycle detected in processor dependencies: [{}]",
cycle_members.join(", ")
);
}
sorted
}
fn partition_into_waves(
accesses: &[ProcessorAccess],
topo_order: &[usize],
processors: &[&dyn Processor],
) -> Vec<Vec<usize>> {
let n = accesses.len();
if n == 0 {
return Vec::new();
}
let name_to_idx: HashMap<&str, usize> = processors
.iter()
.enumerate()
.filter(|(_, p)| !p.processor_name().is_empty())
.map(|(i, p)| (p.processor_name(), i))
.collect();
let mut predecessors: Vec<HashSet<usize>> = vec![HashSet::new(); n];
for (i, proc) in processors.iter().enumerate() {
for dep in proc.execute_after() {
if let Some(&j) = name_to_idx.get(dep) {
predecessors[i].insert(j);
}
}
for dep in proc.execute_before() {
if let Some(&j) = name_to_idx.get(dep) {
predecessors[j].insert(i);
}
}
}
let mut waves: Vec<Vec<usize>> = Vec::new();
let mut wave_of: Vec<usize> = vec![0; n];
for &i in topo_order {
let min_wave = predecessors[i]
.iter()
.map(|&pred| wave_of[pred] + 1)
.max()
.unwrap_or(0);
let mut placed = false;
for (wi, wave) in waves.iter_mut().enumerate().skip(min_wave) {
let has_conflict = wave.iter().any(|&j| conflicts(&accesses[i], &accesses[j]));
if !has_conflict {
wave.push(i);
wave_of[i] = wi;
placed = true;
break;
}
}
if !placed {
wave_of[i] = waves.len();
waves.push(vec![i]);
}
}
waves
}
struct SendPtr<T> {
ptr: *mut T,
}
impl<T> SendPtr<T> {
fn new(ptr: *mut T) -> Self {
Self { ptr }
}
unsafe fn as_mut(&self) -> &mut T {
&mut *self.ptr
}
}
impl<T> Clone for SendPtr<T> {
fn clone(&self) -> Self {
Self { ptr: self.ptr }
}
}
impl<T> Copy for SendPtr<T> {}
unsafe impl<T> Send for SendPtr<T> {}
unsafe impl<T> Sync for SendPtr<T> {}
fn run_processor_ptrs(ptrs: &mut [*mut Box<dyn Processor>], universe: &mut Universe) {
if ptrs.is_empty() {
return;
}
let processors_ref: Vec<&dyn Processor> = ptrs.iter().map(|&p| unsafe { &**p }).collect();
let topo_order = topological_sort(&processors_ref);
let accesses: Vec<ProcessorAccess> = processors_ref
.iter()
.map(|p| gather_access(*p, universe.registry()))
.collect();
let waves = partition_into_waves(&accesses, &topo_order, &processors_ref);
let commands = Commands::new();
for wave in &waves {
if wave.len() == 1 {
let idx = wave[0];
let proc = unsafe { &mut *ptrs[idx] };
let mut ctx = ProcessorContext::new(universe, &commands);
proc.run(&mut ctx);
} else {
let universe_ptr = SendPtr::new(universe as *mut Universe);
let commands_ref = &commands;
let wave_ptrs: Vec<SendPtr<Box<dyn Processor>>> = wave
.iter()
.map(|&idx| SendPtr::new(ptrs[idx]))
.collect();
rayon::scope(|s| {
for proc_ptr in wave_ptrs {
let uptr = universe_ptr;
s.spawn(move |_| {
let universe_ref = unsafe { uptr.as_mut() };
let proc_ref = unsafe { proc_ptr.as_mut() };
let mut ctx = ProcessorContext::new(universe_ref, commands_ref);
proc_ref.run(&mut ctx);
});
}
});
}
}
commands.apply(universe);
}
fn run_group(group: &mut ProcessorGroup, universe: &mut Universe) {
let mut ptrs: Vec<*mut Box<dyn Processor>> = Vec::new();
flatten_nodes_to_ptrs(&mut group.children, &mut ptrs);
run_processor_ptrs(&mut ptrs, universe);
}
fn run_stage(stage: &mut Stage, universe: &mut Universe) {
let mut i = 0;
while i < stage.children.len() {
match &mut stage.children[i] {
StageNode::Processor(p) => {
let commands = Commands::new();
let mut ctx = ProcessorContext::new(universe, &commands);
p.run(&mut ctx);
commands.apply(universe);
}
StageNode::Group(g) => run_group(g, universe),
}
i += 1;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ecs::component::Component;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq)]
struct Pos(i32, i32);
impl Component for Pos {}
#[derive(Debug, Clone, PartialEq)]
struct Vel(i32, i32);
impl Component for Vel {}
#[derive(Debug, Clone, PartialEq)]
struct Hp(i32);
impl Component for Hp {}
struct MoveProcessor;
impl Processor for MoveProcessor {
fn reads(&self, registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![registry.id_for::<Vel>()]
}
fn writes(&self, registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![registry.id_for::<Pos>()]
}
fn run(&mut self, ctx: &mut ProcessorContext) {
for (_e, pos, vel) in ctx.universe_mut().query::<(&Pos, &Vel)>() {
let _pos = pos;
let _vel = vel;
}
}
}
struct HealProcessor;
impl Processor for HealProcessor {
fn reads(&self, _registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![]
}
fn writes(&self, registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![registry.id_for::<Hp>()]
}
fn run(&mut self, ctx: &mut ProcessorContext) {
let _ = ctx.universe();
}
}
fn make_no_dep_processors(n: usize) -> Vec<Box<dyn Processor>> {
struct Noop;
impl Processor for Noop {
fn reads(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn writes(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn run(&mut self, _: &mut ProcessorContext) {}
}
(0..n).map(|_| Box::new(Noop) as Box<dyn Processor>).collect()
}
#[test]
fn wave_partitioning_disjoint() {
let u = Universe::new();
let reg = u.registry();
let move_access = ProcessorAccess {
reads: [reg.id_for::<Vel>()].into_iter().collect(),
writes: [reg.id_for::<Pos>()].into_iter().collect(),
};
let heal_access = ProcessorAccess {
reads: HashSet::new(),
writes: [reg.id_for::<Hp>()].into_iter().collect(),
};
let procs = make_no_dep_processors(2);
let topo = vec![0, 1];
let procs_ref: Vec<&dyn Processor> = procs.iter().map(|b| b.as_ref()).collect();
let waves = partition_into_waves(&[move_access, heal_access], &topo, &procs_ref);
assert_eq!(waves.len(), 1, "disjoint processors should be in the same wave");
assert_eq!(waves[0].len(), 2);
}
#[test]
fn wave_partitioning_conflicting() {
let u = Universe::new();
let reg = u.registry();
let a = ProcessorAccess {
reads: HashSet::new(),
writes: [reg.id_for::<Pos>()].into_iter().collect(),
};
let b = ProcessorAccess {
reads: [reg.id_for::<Pos>()].into_iter().collect(),
writes: HashSet::new(),
};
let procs = make_no_dep_processors(2);
let topo = vec![0, 1];
let procs_ref: Vec<&dyn Processor> = procs.iter().map(|b| b.as_ref()).collect();
let waves = partition_into_waves(&[a, b], &topo, &procs_ref);
assert_eq!(waves.len(), 2, "conflicting processors should be in separate waves");
}
#[test]
fn schedule_runs_processors() {
let mut universe = Universe::new();
universe.spawn((Pos(0, 0), Vel(1, 1)));
universe.spawn((Hp(100),));
let mut schedule = Schedule::new();
schedule.add_processor(MoveProcessor);
schedule.add_processor(HealProcessor);
schedule.run(&mut universe);
}
#[test]
fn schedule_runs_processors_in_groups() {
let mut universe = Universe::new();
universe.spawn((Pos(0, 0), Vel(1, 1)));
universe.spawn((Hp(100),));
let mut schedule = Schedule::new();
schedule.add_group("Physics", |g| {
g.add_processor(MoveProcessor);
g.add_processor(HealProcessor);
});
schedule.run(&mut universe);
}
#[test]
fn schedule_runs_nested_groups() {
let counter = Arc::new(AtomicU32::new(0));
let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut schedule = Schedule::new();
schedule.add_group_to_stage("Update", "Outer", |outer| {
outer.add_processor(OrderedProcessor {
name: "A",
after: vec![],
before: vec![],
counter: counter.clone(),
observed: observed.clone(),
});
outer.add_group("Inner", |inner| {
inner.add_processor(OrderedProcessor {
name: "B",
after: vec!["A"],
before: vec![],
counter: counter.clone(),
observed: observed.clone(),
});
});
});
let mut universe = Universe::new();
schedule.run(&mut universe);
let log = observed.lock().unwrap();
let a_order = log.iter().find(|x| x.0 == "A").unwrap().1;
let b_order = log.iter().find(|x| x.0 == "B").unwrap().1;
assert!(a_order < b_order, "A (outer) must run before B (nested group)");
}
#[test]
fn schedule_multiple_stages() {
let counter = Arc::new(AtomicU32::new(0));
struct CountProcessor {
counter: Arc<AtomicU32>,
expected_order: u32,
}
impl Processor for CountProcessor {
fn reads(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn writes(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn run(&mut self, _ctx: &mut ProcessorContext) {
let prev = self.counter.fetch_add(1, Ordering::SeqCst);
assert_eq!(prev, self.expected_order);
}
}
let mut schedule = Schedule::new();
schedule.add_stage("PostUpdate");
schedule.add_processor_to_stage("Update", CountProcessor {
counter: counter.clone(),
expected_order: 0,
});
schedule.add_processor_to_stage("PostUpdate", CountProcessor {
counter: counter.clone(),
expected_order: 1,
});
let mut universe = Universe::new();
schedule.run(&mut universe);
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[test]
fn commands_applied_between_stages() {
struct SpawnProcessor;
impl Processor for SpawnProcessor {
fn reads(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn writes(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn run(&mut self, ctx: &mut ProcessorContext) {
ctx.commands().spawn((Pos(42, 42),));
}
}
struct CheckProcessor;
impl Processor for CheckProcessor {
fn reads(&self, registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![registry.id_for::<Pos>()]
}
fn writes(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn run(&mut self, ctx: &mut ProcessorContext) {
assert!(ctx.universe().entity_count() > 0, "deferred spawn should have been applied");
}
}
let mut schedule = Schedule::new();
schedule.add_stage("PostUpdate");
schedule.add_processor_to_stage("Update", SpawnProcessor);
schedule.add_processor_to_stage("PostUpdate", CheckProcessor);
let mut universe = Universe::new();
schedule.run(&mut universe);
assert_eq!(universe.entity_count(), 1);
}
#[test]
fn parallel_processors_disjoint_writes() {
let counter = Arc::new(AtomicU32::new(0));
struct WritePosProcessor { counter: Arc<AtomicU32> }
impl Processor for WritePosProcessor {
fn reads(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn writes(&self, registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![registry.id_for::<Pos>()]
}
fn run(&mut self, _ctx: &mut ProcessorContext) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
struct WriteHpProcessor { counter: Arc<AtomicU32> }
impl Processor for WriteHpProcessor {
fn reads(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn writes(&self, registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![registry.id_for::<Hp>()]
}
fn run(&mut self, _ctx: &mut ProcessorContext) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
let mut schedule = Schedule::new();
schedule.add_processor(WritePosProcessor { counter: counter.clone() });
schedule.add_processor(WriteHpProcessor { counter: counter.clone() });
let mut universe = Universe::new();
schedule.run(&mut universe);
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
struct OrderedProcessor {
name: &'static str,
after: Vec<&'static str>,
before: Vec<&'static str>,
counter: Arc<AtomicU32>,
observed: Arc<std::sync::Mutex<Vec<(&'static str, u32)>>>,
}
impl Processor for OrderedProcessor {
fn reads(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn writes(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn run(&mut self, _ctx: &mut ProcessorContext) {
let order = self.counter.fetch_add(1, Ordering::SeqCst);
self.observed.lock().unwrap().push((self.name, order));
}
fn processor_name(&self) -> &'static str { self.name }
fn execute_after(&self) -> Vec<&'static str> { self.after.clone() }
fn execute_before(&self) -> Vec<&'static str> { self.before.clone() }
}
#[test]
fn explicit_after_ordering() {
let counter = Arc::new(AtomicU32::new(0));
let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut schedule = Schedule::new();
schedule.add_group("G", |g| {
g.add_processor(OrderedProcessor {
name: "B", after: vec!["A"], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
g.add_processor(OrderedProcessor {
name: "A", after: vec![], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
});
let mut universe = Universe::new();
schedule.run(&mut universe);
let log = observed.lock().unwrap();
let a_order = log.iter().find(|x| x.0 == "A").unwrap().1;
let b_order = log.iter().find(|x| x.0 == "B").unwrap().1;
assert!(a_order < b_order, "A must run before B (A={}, B={})", a_order, b_order);
}
#[test]
fn explicit_before_ordering() {
let counter = Arc::new(AtomicU32::new(0));
let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut schedule = Schedule::new();
schedule.add_group("G", |g| {
g.add_processor(OrderedProcessor {
name: "B", after: vec![], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
g.add_processor(OrderedProcessor {
name: "A", after: vec![], before: vec!["B"],
counter: counter.clone(), observed: observed.clone(),
});
});
let mut universe = Universe::new();
schedule.run(&mut universe);
let log = observed.lock().unwrap();
let a_order = log.iter().find(|x| x.0 == "A").unwrap().1;
let b_order = log.iter().find(|x| x.0 == "B").unwrap().1;
assert!(a_order < b_order, "A must run before B (A={}, B={})", a_order, b_order);
}
#[test]
fn transitive_ordering() {
let counter = Arc::new(AtomicU32::new(0));
let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut schedule = Schedule::new();
schedule.add_group("G", |g| {
g.add_processor(OrderedProcessor {
name: "C", after: vec!["B"], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
g.add_processor(OrderedProcessor {
name: "A", after: vec![], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
g.add_processor(OrderedProcessor {
name: "B", after: vec!["A"], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
});
let mut universe = Universe::new();
schedule.run(&mut universe);
let log = observed.lock().unwrap();
let a = log.iter().find(|x| x.0 == "A").unwrap().1;
let b = log.iter().find(|x| x.0 == "B").unwrap().1;
let c = log.iter().find(|x| x.0 == "C").unwrap().1;
assert!(a < b, "A before B");
assert!(b < c, "B before C");
}
#[test]
#[should_panic(expected = "cycle detected")]
fn cycle_detection() {
let counter = Arc::new(AtomicU32::new(0));
let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut schedule = Schedule::new();
schedule.add_group("G", |g| {
g.add_processor(OrderedProcessor {
name: "A", after: vec!["B"], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
g.add_processor(OrderedProcessor {
name: "B", after: vec!["A"], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
});
let mut universe = Universe::new();
schedule.run(&mut universe); }
#[test]
fn mixed_explicit_and_conflict_ordering() {
let counter = Arc::new(AtomicU32::new(0));
let observed = Arc::new(std::sync::Mutex::new(Vec::new()));
struct WritePosOrdered {
name: &'static str,
after: Vec<&'static str>,
before: Vec<&'static str>,
counter: Arc<AtomicU32>,
observed: Arc<std::sync::Mutex<Vec<(&'static str, u32)>>>,
}
impl Processor for WritePosOrdered {
fn reads(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn writes(&self, registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![registry.id_for::<Pos>()]
}
fn run(&mut self, _ctx: &mut ProcessorContext) {
let order = self.counter.fetch_add(1, Ordering::SeqCst);
self.observed.lock().unwrap().push((self.name, order));
}
fn processor_name(&self) -> &'static str { self.name }
fn execute_after(&self) -> Vec<&'static str> { self.after.clone() }
fn execute_before(&self) -> Vec<&'static str> { self.before.clone() }
}
struct ReadPosOrdered {
name: &'static str,
counter: Arc<AtomicU32>,
observed: Arc<std::sync::Mutex<Vec<(&'static str, u32)>>>,
}
impl Processor for ReadPosOrdered {
fn reads(&self, registry: &ComponentIdRegistry) -> Vec<ComponentId> {
vec![registry.id_for::<Pos>()]
}
fn writes(&self, _: &ComponentIdRegistry) -> Vec<ComponentId> { vec![] }
fn run(&mut self, _ctx: &mut ProcessorContext) {
let order = self.counter.fetch_add(1, Ordering::SeqCst);
self.observed.lock().unwrap().push((self.name, order));
}
fn processor_name(&self) -> &'static str { self.name }
}
let mut schedule = Schedule::new();
schedule.add_processor(WritePosOrdered {
name: "A", after: vec![], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
schedule.add_processor(ReadPosOrdered {
name: "B",
counter: counter.clone(), observed: observed.clone(),
});
schedule.add_processor(OrderedProcessor {
name: "C", after: vec!["A"], before: vec![],
counter: counter.clone(), observed: observed.clone(),
});
let mut universe = Universe::new();
schedule.run(&mut universe);
let log = observed.lock().unwrap();
let a = log.iter().find(|x| x.0 == "A").unwrap().1;
let b = log.iter().find(|x| x.0 == "B").unwrap().1;
let c = log.iter().find(|x| x.0 == "C").unwrap().1;
assert!(a < b, "A before B (component conflict)");
assert!(a < c, "A before C (explicit after)");
}
}