hecs_schedule/
schedule.rs

1use std::{
2    any::TypeId,
3    collections::HashMap,
4    fmt::{Debug, Display},
5    ops::{Deref, DerefMut},
6};
7
8use hecs::World;
9use smallvec::SmallVec;
10
11#[cfg(feature = "parallel")]
12use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
13
14use crate::{
15    borrow::{Borrows, MaybeWrite},
16    Access, CommandBuffer, Context, IntoData, Result, System, SystemName, Write,
17};
18
19#[derive(Default, Debug, Clone)]
20/// Holds information regarding batches
21pub struct BatchInfo<'a> {
22    batches: &'a [Batch],
23}
24
25impl<'a> Display for BatchInfo<'a> {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        writeln!(f, "Batches: ")?;
28        for batch in self.batches {
29            for system in &batch.systems {
30                writeln!(f, " - {}", system.name())?;
31            }
32            writeln!(f)?;
33        }
34
35        Ok(())
36    }
37}
38
39#[derive(Default)]
40/// Represents a unit of work with compatible borrows.
41pub struct Batch {
42    systems: SmallVec<[DynamicSystem; 8]>,
43    has_flush: bool,
44}
45
46impl Debug for Batch {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        let mut list = f.debug_list();
49
50        for system in self.systems() {
51            list.entry(&system.name());
52        }
53
54        list.finish()
55    }
56}
57
58impl Batch {
59    fn push(&mut self, system: DynamicSystem) {
60        self.systems.push(system)
61    }
62
63    /// Get a reference to the batch's systems.
64    pub fn systems(&self) -> &SmallVec<[DynamicSystem; 8]> {
65        &self.systems
66    }
67}
68
69impl Deref for Batch {
70    type Target = [DynamicSystem];
71
72    fn deref(&self) -> &Self::Target {
73        &self.systems
74    }
75}
76
77impl DerefMut for Batch {
78    fn deref_mut(&mut self) -> &mut Self::Target {
79        &mut self.systems
80    }
81}
82
83// Type erased boxed system
84#[doc(hidden)]
85pub struct DynamicSystem {
86    func: Box<dyn FnMut(&Context) -> Result<()> + Send>,
87    name: SystemName,
88    borrows: Borrows,
89}
90
91#[doc(hidden)]
92impl DynamicSystem {
93    fn new<S, Args, Ret>(mut system: S) -> Self
94    where
95        S: 'static + System<Args, Ret> + Send,
96    {
97        let borrows = S::borrows();
98        let name = system.name();
99        Self {
100            func: Box::new(move |context| system.execute(context)),
101            name,
102            borrows,
103        }
104    }
105
106    fn execute(&mut self, context: &Context) -> Result<()> {
107        (self.func)(context)
108    }
109
110    /// Get a reference to the dynamic system's name.
111    pub fn name(&self) -> &str {
112        self.name.as_ref()
113    }
114}
115
116/// A shedule represents a collections of system which will run with effects in
117/// a determined order.
118pub struct Schedule {
119    batches: Vec<Batch>,
120    cmd: CommandBuffer,
121}
122
123impl Schedule {
124    /// Creates a new schedule from provided batches.
125    pub fn new(batches: Vec<Batch>) -> Self {
126        Self {
127            batches,
128            cmd: Default::default(),
129        }
130    }
131
132    /// Returns information of how the schedule was split into batches
133    pub fn batch_info(&self) -> BatchInfo {
134        BatchInfo {
135            batches: &self.batches,
136        }
137    }
138
139    /// Creates a new [ScheduleBuilder]
140    pub fn builder() -> ScheduleBuilder {
141        ScheduleBuilder::default()
142    }
143
144    /// Executes the systems inside the schedule sequentially using the provided data, which
145    /// is a tuple of mutable references. Returns Err if any system fails.
146    ///
147    /// A commandbuffer is always available and will be flushed at the end.
148    pub fn execute_seq<D: IntoData<CommandBuffer>>(&mut self, data: D) -> Result<()> {
149        let data = unsafe { data.into_data(&mut self.cmd) };
150
151        let context = Context::new(&data);
152
153        self.batches.iter_mut().try_for_each(|batch| {
154            batch
155                .iter_mut()
156                .try_for_each(|system| system.execute(&context))
157        })
158    }
159
160    #[cfg(feature = "parallel")]
161    /// Executes the systems inside the schedule ina parallel using the provided data, which
162    /// is a tuple of mutable references. Returns Err if any system fails
163    ///
164    /// A commandbuffer is always available and will be flushed at the end.
165    pub fn execute<D: IntoData<CommandBuffer> + Send + Sync>(&mut self, data: D) -> Result<()> {
166        let data = unsafe { data.into_data(&mut self.cmd) };
167
168        let context = Context::new(&data);
169
170        self.batches.iter_mut().try_for_each(|batch| {
171            batch
172                .par_iter_mut()
173                .try_for_each(|system| system.execute(&context))
174        })
175    }
176
177    /// Get a reference to the schedule's cmd.
178    pub fn cmd(&self) -> &CommandBuffer {
179        &self.cmd
180    }
181
182    /// Get a mutable reference to the schedule's cmd.
183    pub fn cmd_mut(&mut self) -> &mut CommandBuffer {
184        &mut self.cmd
185    }
186}
187
188#[derive(Default)]
189/// Builder for incrementally constructing a schedule.
190pub struct ScheduleBuilder {
191    batches: Vec<Batch>,
192    current_batch: Batch,
193    current_borrows: HashMap<TypeId, Access>,
194}
195
196impl ScheduleBuilder {
197    /// Creates a new [ScheduleBuilder]
198    pub fn new() -> Self {
199        Default::default()
200    }
201
202    /// Add a system to the builder
203    pub fn add_system<Args, Ret, S>(&mut self, system: S) -> &mut Self
204    where
205        S: 'static + System<Args, Ret> + Send,
206    {
207        self.add_internal(DynamicSystem::new(system));
208        self
209    }
210
211    fn add_internal(&mut self, system: DynamicSystem) {
212        // Check borrow
213        let borrows = &system.borrows;
214
215        if !self.check_compatible(borrows) {
216            // Push and create a new batch
217            self.barrier();
218        }
219
220        self.add_borrows(borrows);
221        self.current_batch.push(system);
222    }
223
224    /// Append all system from `other` into self, leaving `other` empty.
225    /// This allows constructing smaller schedules in different modules and then
226    /// joining them together. Work will be paralellized between the two
227    /// schedules.
228    pub fn append(&mut self, other: &mut ScheduleBuilder) -> &mut Self {
229        other.barrier();
230
231        other.batches.drain(..).for_each(|mut batch| {
232            batch
233                .systems
234                .drain(..)
235                .for_each(|system| self.add_internal(system))
236        });
237
238        self
239    }
240
241    /// Inserts a barrier that will divide the schedule pararell execution in
242    /// two dependant halves.
243    ///
244    /// Usually this is not required, as the borrows of the system automatically
245    /// creates dependencies, but sometimes a manual dependency is needed for things
246    /// such as interior mutability or channels.
247    pub fn barrier(&mut self) -> &mut Self {
248        let batch = std::mem::take(&mut self.current_batch);
249
250        self.batches.push(batch);
251
252        self.current_borrows.clear();
253
254        self
255    }
256
257    /// Flush the commandbuffer and apply the commands to the world
258    pub fn flush(&mut self) -> &mut Self {
259        self.current_batch.has_flush = true;
260        self.add_system(flush_system)
261    }
262
263    fn add_borrows(&mut self, borrows: &Borrows) {
264        self.current_borrows
265            .extend(borrows.into_iter().map(|val| (val.id(), *val)))
266    }
267
268    /// Returns true if no borrows conflict with the current ones
269    fn check_compatible(&self, borrows: &Borrows) -> bool {
270        for borrow in borrows {
271            // Type is already borrowed
272            if let Some(curr) = self.current_borrows.get(&borrow.id()) {
273                // Already exclusively borrowed or new borrow is exlcusive
274                if curr.exclusive() || borrow.exclusive() {
275                    return false;
276                }
277            }
278        }
279
280        true
281    }
282
283    /// FLushes the commandbuffer and builds the schedule.
284    pub fn build(&mut self) -> Schedule {
285        self.flush();
286        // Push the current batch
287        self.barrier();
288
289        let builder = std::mem::take(self);
290
291        Schedule::new(builder.batches)
292    }
293}
294
295// Flushes the commandbuffer
296fn flush_system(mut world: MaybeWrite<World>, mut cmd: Write<CommandBuffer>) -> Result<()> {
297    if let Some(world) = world.option_mut() {
298        cmd.execute(world);
299    }
300    Ok(())
301}