1pub mod engines;
2
3use crate::ecs::{AccessType, System, Universe};
4pub use hecs::*;
5use std::{any::TypeId, collections::HashSet, marker::PhantomData};
6use typid::ID;
7
8pub type PipelineId = ID<PhantomData<dyn PipelineEngine + Send + Sync>>;
9
10#[derive(Debug, Clone, PartialEq)]
11pub enum PipelineBuilderError {
12 DependencyNotFound(String),
13}
14
15#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
16pub enum PipelineLayer {
17 Pre,
18 Main,
19 Post,
20}
21
22impl Default for PipelineLayer {
23 fn default() -> Self {
24 Self::Main
25 }
26}
27
28pub trait PipelineBuilder: Sized {
29 fn add_system_on_layer<AT: AccessType>(
30 &mut self,
31 name: &str,
32 system: System,
33 dependencies: &[&str],
34 layer: PipelineLayer,
35 lock_on_single_thread: bool,
36 ) -> Result<(), PipelineBuilderError>;
37
38 fn add_system<AT: AccessType>(
39 &mut self,
40 name: &str,
41 system: System,
42 dependencies: &[&str],
43 ) -> Result<(), PipelineBuilderError> {
44 self.add_system_on_layer::<AT>(name, system, dependencies, PipelineLayer::Main, false)
45 }
46
47 fn add_system_on_single_thread<AT: AccessType>(
48 &mut self,
49 name: &str,
50 system: System,
51 dependencies: &[&str],
52 ) -> Result<(), PipelineBuilderError> {
53 self.add_system_on_layer::<AT>(name, system, dependencies, PipelineLayer::Main, true)
54 }
55
56 fn with_system_on_layer<AT: AccessType>(
57 mut self,
58 name: &str,
59 system: System,
60 dependencies: &[&str],
61 layer: PipelineLayer,
62 lock_on_single_thread: bool,
63 ) -> Result<Self, PipelineBuilderError> {
64 self.add_system_on_layer::<AT>(name, system, dependencies, layer, lock_on_single_thread)?;
65 Ok(self)
66 }
67
68 fn with_system<AT: AccessType>(
69 self,
70 name: &str,
71 system: System,
72 dependencies: &[&str],
73 ) -> Result<Self, PipelineBuilderError> {
74 self.with_system_on_layer::<AT>(name, system, dependencies, PipelineLayer::Main, false)
75 }
76
77 fn with_system_on_single_thread<AT: AccessType>(
78 self,
79 name: &str,
80 system: System,
81 dependencies: &[&str],
82 ) -> Result<Self, PipelineBuilderError> {
83 self.with_system_on_layer::<AT>(name, system, dependencies, PipelineLayer::Main, true)
84 }
85
86 fn graph(self) -> PipelineGraph;
87
88 fn build<T>(self) -> T
89 where
90 T: PipelineEngine + Default,
91 {
92 self.build_with_engine(T::default())
93 }
94
95 fn build_with_engine<T>(self, mut engine: T) -> T
96 where
97 T: PipelineEngine,
98 {
99 engine.setup(self.graph());
100 engine
101 }
102}
103
104#[derive(Debug, Clone, PartialEq)]
105pub(crate) struct PipelineBuilderMeta {
106 name: String,
107 system: PipelineGraphSystem,
108}
109
110#[derive(Debug, Clone, PartialEq)]
111pub struct ParallelPipelineBuilder {
112 parallel_jobs: usize,
113 systems_pre: Vec<Vec<PipelineBuilderMeta>>,
114 systems_main: Vec<Vec<PipelineBuilderMeta>>,
115 systems_post: Vec<Vec<PipelineBuilderMeta>>,
116}
117
118impl Default for ParallelPipelineBuilder {
119 #[cfg(not(feature = "parallel"))]
120 fn default() -> Self {
121 Self::new(1)
122 }
123
124 #[cfg(feature = "parallel")]
125 fn default() -> Self {
126 Self::new(rayon::current_num_threads())
127 }
128}
129
130impl ParallelPipelineBuilder {
131 pub fn new(parallel_jobs: usize) -> Self {
132 Self {
133 parallel_jobs: parallel_jobs.max(1),
134 systems_pre: Default::default(),
135 systems_main: Default::default(),
136 systems_post: Default::default(),
137 }
138 }
139}
140
141impl PipelineBuilder for ParallelPipelineBuilder {
142 fn add_system_on_layer<AT: AccessType>(
143 &mut self,
144 name: &str,
145 system: System,
146 dependencies: &[&str],
147 layer: PipelineLayer,
148 lock_on_single_thread: bool,
149 ) -> Result<(), PipelineBuilderError> {
150 let systems = match layer {
151 PipelineLayer::Pre => &mut self.systems_pre,
152 PipelineLayer::Main => &mut self.systems_main,
153 PipelineLayer::Post => &mut self.systems_post,
154 };
155 for dep in dependencies {
156 if !systems
157 .iter()
158 .any(|g| g.iter().any(|meta| meta.name.as_str() == *dep))
159 {
160 return Err(PipelineBuilderError::DependencyNotFound(dep.to_string()));
161 }
162 }
163 let (reads, writes) = AT::get_types();
164 if self.parallel_jobs == 1 {
165 systems.push(vec![PipelineBuilderMeta {
166 name: name.to_owned(),
167 system: PipelineGraphSystem {
168 system,
169 reads,
170 writes,
171 layer,
172 lock_on_single_thread,
173 },
174 }]);
175 return Ok(());
176 }
177 let mut dependencies_left = dependencies.iter().copied().collect::<HashSet<_>>();
178 for group in systems.iter_mut() {
179 if !dependencies_left.is_empty() {
180 for meta in group {
181 dependencies_left.remove(meta.name.as_str());
182 }
183 } else if group.len() < self.parallel_jobs
184 && group
185 .iter()
186 .all(|meta| meta.system.writes.is_disjoint(&writes))
187 {
188 group.push(PipelineBuilderMeta {
189 name: name.to_owned(),
190 system: PipelineGraphSystem {
191 system,
192 reads,
193 writes,
194 layer,
195 lock_on_single_thread,
196 },
197 });
198 return Ok(());
199 }
200 }
201 systems.push(vec![PipelineBuilderMeta {
202 name: name.to_owned(),
203 system: PipelineGraphSystem {
204 system,
205 reads,
206 writes,
207 layer,
208 lock_on_single_thread,
209 },
210 }]);
211 Ok(())
212 }
213
214 fn graph(self) -> PipelineGraph {
215 PipelineGraph::Sequence(
216 self.systems_pre
217 .into_iter()
218 .map(|group| {
219 PipelineGraph::Parallel(
220 group
221 .into_iter()
222 .map(|meta| PipelineGraph::System(meta.system))
223 .collect(),
224 )
225 })
226 .chain(self.systems_main.into_iter().map(|group| {
227 PipelineGraph::Parallel(
228 group
229 .into_iter()
230 .map(|meta| PipelineGraph::System(meta.system))
231 .collect(),
232 )
233 }))
234 .chain(self.systems_post.into_iter().map(|group| {
235 PipelineGraph::Parallel(
236 group
237 .into_iter()
238 .map(|meta| PipelineGraph::System(meta.system))
239 .collect(),
240 )
241 }))
242 .collect(),
243 )
244 }
245}
246
247#[derive(Debug, Default, Clone, PartialEq)]
248pub struct LinearPipelineBuilder {
249 systems_pre: Vec<PipelineBuilderMeta>,
250 systems_main: Vec<PipelineBuilderMeta>,
251 systems_post: Vec<PipelineBuilderMeta>,
252}
253
254impl PipelineBuilder for LinearPipelineBuilder {
255 fn add_system_on_layer<AT: AccessType>(
256 &mut self,
257 name: &str,
258 system: System,
259 dependencies: &[&str],
260 layer: PipelineLayer,
261 lock_on_single_thread: bool,
262 ) -> Result<(), PipelineBuilderError> {
263 let systems = match layer {
264 PipelineLayer::Pre => &mut self.systems_pre,
265 PipelineLayer::Main => &mut self.systems_main,
266 PipelineLayer::Post => &mut self.systems_post,
267 };
268 for dep in dependencies {
269 if !systems.iter().any(|meta| meta.name.as_str() == *dep) {
270 return Err(PipelineBuilderError::DependencyNotFound(dep.to_string()));
271 }
272 }
273 let (reads, writes) = AT::get_types();
274 systems.push(PipelineBuilderMeta {
275 name: name.to_string(),
276 system: PipelineGraphSystem {
277 system,
278 reads,
279 writes,
280 layer,
281 lock_on_single_thread,
282 },
283 });
284 Ok(())
285 }
286
287 fn graph(self) -> PipelineGraph {
288 PipelineGraph::Sequence(
289 self.systems_pre
290 .into_iter()
291 .map(|meta| PipelineGraph::System(meta.system))
292 .chain(
293 self.systems_main
294 .into_iter()
295 .map(|meta| PipelineGraph::System(meta.system)),
296 )
297 .chain(
298 self.systems_post
299 .into_iter()
300 .map(|meta| PipelineGraph::System(meta.system)),
301 )
302 .collect(),
303 )
304 }
305}
306
307#[derive(Clone)]
308pub struct PipelineGraphSystem {
309 pub system: System,
310 pub reads: HashSet<TypeId>,
311 pub writes: HashSet<TypeId>,
312 pub layer: PipelineLayer,
313 pub lock_on_single_thread: bool,
314}
315
316impl PartialEq for PipelineGraphSystem {
317 fn eq(&self, other: &Self) -> bool {
318 let a = self.system as *const ();
319 let b = other.system as *const ();
320 a == b && self.reads == other.reads && self.writes == other.writes
321 }
322}
323
324impl std::fmt::Debug for PipelineGraphSystem {
325 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326 f.debug_struct("PipelineGraphSystem")
327 .field("system", &format!("{:p}", self.system as *const ()))
328 .field("reads", &self.reads)
329 .field("writes", &self.writes)
330 .field("lock_on_single_thread", &self.lock_on_single_thread)
331 .finish()
332 }
333}
334
335#[derive(Debug, Clone, PartialEq)]
336pub enum PipelineGraph {
337 System(PipelineGraphSystem),
338 Sequence(Vec<PipelineGraph>),
339 Parallel(Vec<PipelineGraph>),
340}
341
342impl PipelineGraph {
343 pub fn is_lock_on_single_thread(&self) -> bool {
344 matches!(
345 self,
346 Self::System(PipelineGraphSystem {
347 lock_on_single_thread: true,
348 ..
349 })
350 )
351 }
352}
353
354pub trait PipelineEngine {
355 fn setup(&mut self, graph: PipelineGraph);
356 fn run(&self, universe: &mut Universe);
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362 use crate::ecs::pipeline::{
363 engines::{default::DefaultPipelineEngine, sequence::SequencePipelineEngine},
364 LinearPipelineBuilder, ParallelPipelineBuilder,
365 };
366
367 macro_rules! types {
368 () => (std::collections::HashSet::new());
369 ( $($p:path),* ) => {
370 {
371 #[allow(unused_mut)]
372 let mut result = std::collections::HashSet::new();
373 $( result.insert(std::any::TypeId::of::<$p>()); )*
374 result
375 }
376 }
377 }
378
379 #[test]
380 fn test_pipeline_builder() {
381 struct A;
382 struct B;
383 struct C;
384
385 fn system_a(_: &mut Universe) {}
386 fn system_b(_: &mut Universe) {}
387 fn system_c(_: &mut Universe) {}
388
389 let builder = ParallelPipelineBuilder::new(8)
390 .with_system::<&mut A>("a", system_a, &[])
391 .unwrap()
392 .with_system::<&mut B>("b", system_b, &[])
393 .unwrap()
394 .with_system::<(&mut A, &mut B)>("c", system_c, &[])
395 .unwrap()
396 .with_system::<&mut C>("cc", system_c, &["a", "b"])
397 .unwrap()
398 .with_system::<()>("ccc", system_c, &[])
399 .unwrap();
400 assert_eq!(
401 builder,
402 ParallelPipelineBuilder {
403 parallel_jobs: 8,
404 systems_pre: vec![],
405 systems_main: vec![
406 vec![
407 PipelineBuilderMeta {
408 name: "a".to_owned(),
409 system: PipelineGraphSystem {
410 system: system_a,
411 reads: types!(),
412 writes: types!(A),
413 layer: PipelineLayer::Main,
414 lock_on_single_thread: false,
415 },
416 },
417 PipelineBuilderMeta {
418 name: "b".to_owned(),
419 system: PipelineGraphSystem {
420 system: system_b,
421 reads: types!(),
422 writes: types!(B),
423 layer: PipelineLayer::Main,
424 lock_on_single_thread: false,
425 },
426 },
427 PipelineBuilderMeta {
428 name: "ccc".to_owned(),
429 system: PipelineGraphSystem {
430 system: system_c,
431 reads: types!(),
432 writes: types!(),
433 layer: PipelineLayer::Main,
434 lock_on_single_thread: false,
435 },
436 },
437 ],
438 vec![
439 PipelineBuilderMeta {
440 name: "c".to_owned(),
441 system: PipelineGraphSystem {
442 system: system_c,
443 reads: types!(),
444 writes: types!(A, B),
445 layer: PipelineLayer::Main,
446 lock_on_single_thread: false,
447 },
448 },
449 PipelineBuilderMeta {
450 name: "cc".to_owned(),
451 system: PipelineGraphSystem {
452 system: system_c,
453 reads: types!(),
454 writes: types!(C),
455 layer: PipelineLayer::Main,
456 lock_on_single_thread: false,
457 },
458 },
459 ],
460 ],
461 systems_post: vec![],
462 }
463 );
464 assert_eq!(
465 builder.clone().graph(),
466 PipelineGraph::Sequence(vec![
467 PipelineGraph::Parallel(vec![
468 PipelineGraph::System(PipelineGraphSystem {
469 system: system_a,
470 reads: types!(),
471 writes: types!(A),
472 layer: PipelineLayer::Main,
473 lock_on_single_thread: false,
474 }),
475 PipelineGraph::System(PipelineGraphSystem {
476 system: system_b,
477 reads: types!(),
478 writes: types!(B),
479 layer: PipelineLayer::Main,
480 lock_on_single_thread: false,
481 }),
482 PipelineGraph::System(PipelineGraphSystem {
483 system: system_c,
484 reads: types!(),
485 writes: types!(),
486 layer: PipelineLayer::Main,
487 lock_on_single_thread: false,
488 }),
489 ]),
490 PipelineGraph::Parallel(vec![
491 PipelineGraph::System(PipelineGraphSystem {
492 system: system_c,
493 reads: types!(),
494 writes: types!(A, B),
495 layer: PipelineLayer::Main,
496 lock_on_single_thread: false,
497 }),
498 PipelineGraph::System(PipelineGraphSystem {
499 system: system_c,
500 reads: types!(),
501 writes: types!(C),
502 layer: PipelineLayer::Main,
503 lock_on_single_thread: false,
504 }),
505 ]),
506 ])
507 );
508 assert_eq!(
509 builder.clone().build::<SequencePipelineEngine>(),
510 SequencePipelineEngine {
511 systems: vec![system_a, system_b, system_c, system_c, system_c,],
512 }
513 );
514 assert_eq!(
515 builder.clone().build::<DefaultPipelineEngine>(),
516 DefaultPipelineEngine {
517 parallel: false,
518 graph: Some(PipelineGraph::Sequence(vec![
519 PipelineGraph::Parallel(vec![
520 PipelineGraph::System(PipelineGraphSystem {
521 system: system_a,
522 reads: types!(),
523 writes: types!(A),
524 layer: PipelineLayer::Main,
525 lock_on_single_thread: false,
526 }),
527 PipelineGraph::System(PipelineGraphSystem {
528 system: system_b,
529 reads: types!(),
530 writes: types!(B),
531 layer: PipelineLayer::Main,
532 lock_on_single_thread: false,
533 }),
534 PipelineGraph::System(PipelineGraphSystem {
535 system: system_c,
536 reads: types!(),
537 writes: types!(),
538 layer: PipelineLayer::Main,
539 lock_on_single_thread: false,
540 }),
541 ]),
542 PipelineGraph::Parallel(vec![
543 PipelineGraph::System(PipelineGraphSystem {
544 system: system_c,
545 reads: types!(),
546 writes: types!(A, B),
547 layer: PipelineLayer::Main,
548 lock_on_single_thread: false,
549 }),
550 PipelineGraph::System(PipelineGraphSystem {
551 system: system_c,
552 reads: types!(),
553 writes: types!(C),
554 layer: PipelineLayer::Main,
555 lock_on_single_thread: false,
556 }),
557 ]),
558 ])),
559 }
560 );
561
562 let builder = LinearPipelineBuilder::default()
563 .with_system::<&mut A>("a", system_a, &[])
564 .unwrap()
565 .with_system::<&mut B>("b", system_b, &[])
566 .unwrap()
567 .with_system::<(&mut A, &mut B)>("c", system_c, &[])
568 .unwrap()
569 .with_system::<&mut C>("cc", system_c, &["a", "b"])
570 .unwrap()
571 .with_system::<()>("ccc", system_c, &[])
572 .unwrap();
573 assert_eq!(
574 builder,
575 LinearPipelineBuilder {
576 systems_pre: vec![],
577 systems_main: vec![
578 PipelineBuilderMeta {
579 name: "a".to_owned(),
580 system: PipelineGraphSystem {
581 system: system_a,
582 reads: types!(),
583 writes: types!(A),
584 layer: PipelineLayer::Main,
585 lock_on_single_thread: false,
586 },
587 },
588 PipelineBuilderMeta {
589 name: "b".to_owned(),
590 system: PipelineGraphSystem {
591 system: system_b,
592 reads: types!(),
593 writes: types!(B),
594 layer: PipelineLayer::Main,
595 lock_on_single_thread: false,
596 },
597 },
598 PipelineBuilderMeta {
599 name: "c".to_owned(),
600 system: PipelineGraphSystem {
601 system: system_c,
602 reads: types!(),
603 writes: types!(A, B),
604 layer: PipelineLayer::Main,
605 lock_on_single_thread: false,
606 },
607 },
608 PipelineBuilderMeta {
609 name: "cc".to_owned(),
610 system: PipelineGraphSystem {
611 system: system_c,
612 reads: types!(),
613 writes: types!(C),
614 layer: PipelineLayer::Main,
615 lock_on_single_thread: false,
616 },
617 },
618 PipelineBuilderMeta {
619 name: "ccc".to_owned(),
620 system: PipelineGraphSystem {
621 system: system_c,
622 reads: types!(),
623 writes: types!(),
624 layer: PipelineLayer::Main,
625 lock_on_single_thread: false,
626 },
627 },
628 ],
629 systems_post: vec![],
630 }
631 );
632 assert_eq!(
633 builder.clone().graph(),
634 PipelineGraph::Sequence(vec![
635 PipelineGraph::System(PipelineGraphSystem {
636 system: system_a,
637 reads: types!(),
638 writes: types!(A),
639 layer: PipelineLayer::Main,
640 lock_on_single_thread: false,
641 }),
642 PipelineGraph::System(PipelineGraphSystem {
643 system: system_b,
644 reads: types!(),
645 writes: types!(B),
646 layer: PipelineLayer::Main,
647 lock_on_single_thread: false,
648 }),
649 PipelineGraph::System(PipelineGraphSystem {
650 system: system_c,
651 reads: types!(),
652 writes: types!(A, B),
653 layer: PipelineLayer::Main,
654 lock_on_single_thread: false,
655 }),
656 PipelineGraph::System(PipelineGraphSystem {
657 system: system_c,
658 reads: types!(),
659 writes: types!(C),
660 layer: PipelineLayer::Main,
661 lock_on_single_thread: false,
662 }),
663 PipelineGraph::System(PipelineGraphSystem {
664 system: system_c,
665 reads: types!(),
666 writes: types!(),
667 layer: PipelineLayer::Main,
668 lock_on_single_thread: false,
669 }),
670 ])
671 );
672 assert_eq!(
673 builder.clone().build::<SequencePipelineEngine>(),
674 SequencePipelineEngine {
675 systems: vec![system_a, system_b, system_c, system_c, system_c,],
676 }
677 );
678 assert_eq!(
679 builder.clone().build::<DefaultPipelineEngine>(),
680 DefaultPipelineEngine {
681 parallel: false,
682 graph: Some(PipelineGraph::Sequence(vec![
683 PipelineGraph::System(PipelineGraphSystem {
684 system: system_a,
685 reads: types!(),
686 writes: types!(A),
687 layer: PipelineLayer::Main,
688 lock_on_single_thread: false,
689 }),
690 PipelineGraph::System(PipelineGraphSystem {
691 system: system_b,
692 reads: types!(),
693 writes: types!(B),
694 layer: PipelineLayer::Main,
695 lock_on_single_thread: false,
696 }),
697 PipelineGraph::System(PipelineGraphSystem {
698 system: system_c,
699 reads: types!(),
700 writes: types!(A, B),
701 layer: PipelineLayer::Main,
702 lock_on_single_thread: false,
703 }),
704 PipelineGraph::System(PipelineGraphSystem {
705 system: system_c,
706 reads: types!(),
707 writes: types!(C),
708 layer: PipelineLayer::Main,
709 lock_on_single_thread: false,
710 }),
711 PipelineGraph::System(PipelineGraphSystem {
712 system: system_c,
713 reads: types!(),
714 writes: types!(),
715 layer: PipelineLayer::Main,
716 lock_on_single_thread: false,
717 }),
718 ])),
719 }
720 );
721 }
722}