oxygengine_core/ecs/pipeline/
mod.rs

1pub mod engines;
2
3use crate::ecs::{AccessType, System, Universe};
4pub use hecs::*;
5use std::{any::TypeId, collections::HashSet, marker::PhantomData};
6use typid::ID;
7
8pub type PipelineId = ID<PhantomData<dyn PipelineEngine + Send + Sync>>;
9
10#[derive(Debug, Clone, PartialEq)]
11pub enum PipelineBuilderError {
12    DependencyNotFound(String),
13}
14
15#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
16pub enum PipelineLayer {
17    Pre,
18    Main,
19    Post,
20}
21
22impl Default for PipelineLayer {
23    fn default() -> Self {
24        Self::Main
25    }
26}
27
28pub trait PipelineBuilder: Sized {
29    fn add_system_on_layer<AT: AccessType>(
30        &mut self,
31        name: &str,
32        system: System,
33        dependencies: &[&str],
34        layer: PipelineLayer,
35        lock_on_single_thread: bool,
36    ) -> Result<(), PipelineBuilderError>;
37
38    fn add_system<AT: AccessType>(
39        &mut self,
40        name: &str,
41        system: System,
42        dependencies: &[&str],
43    ) -> Result<(), PipelineBuilderError> {
44        self.add_system_on_layer::<AT>(name, system, dependencies, PipelineLayer::Main, false)
45    }
46
47    fn add_system_on_single_thread<AT: AccessType>(
48        &mut self,
49        name: &str,
50        system: System,
51        dependencies: &[&str],
52    ) -> Result<(), PipelineBuilderError> {
53        self.add_system_on_layer::<AT>(name, system, dependencies, PipelineLayer::Main, true)
54    }
55
56    fn with_system_on_layer<AT: AccessType>(
57        mut self,
58        name: &str,
59        system: System,
60        dependencies: &[&str],
61        layer: PipelineLayer,
62        lock_on_single_thread: bool,
63    ) -> Result<Self, PipelineBuilderError> {
64        self.add_system_on_layer::<AT>(name, system, dependencies, layer, lock_on_single_thread)?;
65        Ok(self)
66    }
67
68    fn with_system<AT: AccessType>(
69        self,
70        name: &str,
71        system: System,
72        dependencies: &[&str],
73    ) -> Result<Self, PipelineBuilderError> {
74        self.with_system_on_layer::<AT>(name, system, dependencies, PipelineLayer::Main, false)
75    }
76
77    fn with_system_on_single_thread<AT: AccessType>(
78        self,
79        name: &str,
80        system: System,
81        dependencies: &[&str],
82    ) -> Result<Self, PipelineBuilderError> {
83        self.with_system_on_layer::<AT>(name, system, dependencies, PipelineLayer::Main, true)
84    }
85
86    fn graph(self) -> PipelineGraph;
87
88    fn build<T>(self) -> T
89    where
90        T: PipelineEngine + Default,
91    {
92        self.build_with_engine(T::default())
93    }
94
95    fn build_with_engine<T>(self, mut engine: T) -> T
96    where
97        T: PipelineEngine,
98    {
99        engine.setup(self.graph());
100        engine
101    }
102}
103
104#[derive(Debug, Clone, PartialEq)]
105pub(crate) struct PipelineBuilderMeta {
106    name: String,
107    system: PipelineGraphSystem,
108}
109
110#[derive(Debug, Clone, PartialEq)]
111pub struct ParallelPipelineBuilder {
112    parallel_jobs: usize,
113    systems_pre: Vec<Vec<PipelineBuilderMeta>>,
114    systems_main: Vec<Vec<PipelineBuilderMeta>>,
115    systems_post: Vec<Vec<PipelineBuilderMeta>>,
116}
117
118impl Default for ParallelPipelineBuilder {
119    #[cfg(not(feature = "parallel"))]
120    fn default() -> Self {
121        Self::new(1)
122    }
123
124    #[cfg(feature = "parallel")]
125    fn default() -> Self {
126        Self::new(rayon::current_num_threads())
127    }
128}
129
130impl ParallelPipelineBuilder {
131    pub fn new(parallel_jobs: usize) -> Self {
132        Self {
133            parallel_jobs: parallel_jobs.max(1),
134            systems_pre: Default::default(),
135            systems_main: Default::default(),
136            systems_post: Default::default(),
137        }
138    }
139}
140
141impl PipelineBuilder for ParallelPipelineBuilder {
142    fn add_system_on_layer<AT: AccessType>(
143        &mut self,
144        name: &str,
145        system: System,
146        dependencies: &[&str],
147        layer: PipelineLayer,
148        lock_on_single_thread: bool,
149    ) -> Result<(), PipelineBuilderError> {
150        let systems = match layer {
151            PipelineLayer::Pre => &mut self.systems_pre,
152            PipelineLayer::Main => &mut self.systems_main,
153            PipelineLayer::Post => &mut self.systems_post,
154        };
155        for dep in dependencies {
156            if !systems
157                .iter()
158                .any(|g| g.iter().any(|meta| meta.name.as_str() == *dep))
159            {
160                return Err(PipelineBuilderError::DependencyNotFound(dep.to_string()));
161            }
162        }
163        let (reads, writes) = AT::get_types();
164        if self.parallel_jobs == 1 {
165            systems.push(vec![PipelineBuilderMeta {
166                name: name.to_owned(),
167                system: PipelineGraphSystem {
168                    system,
169                    reads,
170                    writes,
171                    layer,
172                    lock_on_single_thread,
173                },
174            }]);
175            return Ok(());
176        }
177        let mut dependencies_left = dependencies.iter().copied().collect::<HashSet<_>>();
178        for group in systems.iter_mut() {
179            if !dependencies_left.is_empty() {
180                for meta in group {
181                    dependencies_left.remove(meta.name.as_str());
182                }
183            } else if group.len() < self.parallel_jobs
184                && group
185                    .iter()
186                    .all(|meta| meta.system.writes.is_disjoint(&writes))
187            {
188                group.push(PipelineBuilderMeta {
189                    name: name.to_owned(),
190                    system: PipelineGraphSystem {
191                        system,
192                        reads,
193                        writes,
194                        layer,
195                        lock_on_single_thread,
196                    },
197                });
198                return Ok(());
199            }
200        }
201        systems.push(vec![PipelineBuilderMeta {
202            name: name.to_owned(),
203            system: PipelineGraphSystem {
204                system,
205                reads,
206                writes,
207                layer,
208                lock_on_single_thread,
209            },
210        }]);
211        Ok(())
212    }
213
214    fn graph(self) -> PipelineGraph {
215        PipelineGraph::Sequence(
216            self.systems_pre
217                .into_iter()
218                .map(|group| {
219                    PipelineGraph::Parallel(
220                        group
221                            .into_iter()
222                            .map(|meta| PipelineGraph::System(meta.system))
223                            .collect(),
224                    )
225                })
226                .chain(self.systems_main.into_iter().map(|group| {
227                    PipelineGraph::Parallel(
228                        group
229                            .into_iter()
230                            .map(|meta| PipelineGraph::System(meta.system))
231                            .collect(),
232                    )
233                }))
234                .chain(self.systems_post.into_iter().map(|group| {
235                    PipelineGraph::Parallel(
236                        group
237                            .into_iter()
238                            .map(|meta| PipelineGraph::System(meta.system))
239                            .collect(),
240                    )
241                }))
242                .collect(),
243        )
244    }
245}
246
247#[derive(Debug, Default, Clone, PartialEq)]
248pub struct LinearPipelineBuilder {
249    systems_pre: Vec<PipelineBuilderMeta>,
250    systems_main: Vec<PipelineBuilderMeta>,
251    systems_post: Vec<PipelineBuilderMeta>,
252}
253
254impl PipelineBuilder for LinearPipelineBuilder {
255    fn add_system_on_layer<AT: AccessType>(
256        &mut self,
257        name: &str,
258        system: System,
259        dependencies: &[&str],
260        layer: PipelineLayer,
261        lock_on_single_thread: bool,
262    ) -> Result<(), PipelineBuilderError> {
263        let systems = match layer {
264            PipelineLayer::Pre => &mut self.systems_pre,
265            PipelineLayer::Main => &mut self.systems_main,
266            PipelineLayer::Post => &mut self.systems_post,
267        };
268        for dep in dependencies {
269            if !systems.iter().any(|meta| meta.name.as_str() == *dep) {
270                return Err(PipelineBuilderError::DependencyNotFound(dep.to_string()));
271            }
272        }
273        let (reads, writes) = AT::get_types();
274        systems.push(PipelineBuilderMeta {
275            name: name.to_string(),
276            system: PipelineGraphSystem {
277                system,
278                reads,
279                writes,
280                layer,
281                lock_on_single_thread,
282            },
283        });
284        Ok(())
285    }
286
287    fn graph(self) -> PipelineGraph {
288        PipelineGraph::Sequence(
289            self.systems_pre
290                .into_iter()
291                .map(|meta| PipelineGraph::System(meta.system))
292                .chain(
293                    self.systems_main
294                        .into_iter()
295                        .map(|meta| PipelineGraph::System(meta.system)),
296                )
297                .chain(
298                    self.systems_post
299                        .into_iter()
300                        .map(|meta| PipelineGraph::System(meta.system)),
301                )
302                .collect(),
303        )
304    }
305}
306
307#[derive(Clone)]
308pub struct PipelineGraphSystem {
309    pub system: System,
310    pub reads: HashSet<TypeId>,
311    pub writes: HashSet<TypeId>,
312    pub layer: PipelineLayer,
313    pub lock_on_single_thread: bool,
314}
315
316impl PartialEq for PipelineGraphSystem {
317    fn eq(&self, other: &Self) -> bool {
318        let a = self.system as *const ();
319        let b = other.system as *const ();
320        a == b && self.reads == other.reads && self.writes == other.writes
321    }
322}
323
324impl std::fmt::Debug for PipelineGraphSystem {
325    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326        f.debug_struct("PipelineGraphSystem")
327            .field("system", &format!("{:p}", self.system as *const ()))
328            .field("reads", &self.reads)
329            .field("writes", &self.writes)
330            .field("lock_on_single_thread", &self.lock_on_single_thread)
331            .finish()
332    }
333}
334
335#[derive(Debug, Clone, PartialEq)]
336pub enum PipelineGraph {
337    System(PipelineGraphSystem),
338    Sequence(Vec<PipelineGraph>),
339    Parallel(Vec<PipelineGraph>),
340}
341
342impl PipelineGraph {
343    pub fn is_lock_on_single_thread(&self) -> bool {
344        matches!(
345            self,
346            Self::System(PipelineGraphSystem {
347                lock_on_single_thread: true,
348                ..
349            })
350        )
351    }
352}
353
354pub trait PipelineEngine {
355    fn setup(&mut self, graph: PipelineGraph);
356    fn run(&self, universe: &mut Universe);
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362    use crate::ecs::pipeline::{
363        engines::{default::DefaultPipelineEngine, sequence::SequencePipelineEngine},
364        LinearPipelineBuilder, ParallelPipelineBuilder,
365    };
366
367    macro_rules! types {
368        () => (std::collections::HashSet::new());
369        ( $($p:path),* ) => {
370            {
371                #[allow(unused_mut)]
372                let mut result = std::collections::HashSet::new();
373                $( result.insert(std::any::TypeId::of::<$p>()); )*
374                result
375            }
376        }
377    }
378
379    #[test]
380    fn test_pipeline_builder() {
381        struct A;
382        struct B;
383        struct C;
384
385        fn system_a(_: &mut Universe) {}
386        fn system_b(_: &mut Universe) {}
387        fn system_c(_: &mut Universe) {}
388
389        let builder = ParallelPipelineBuilder::new(8)
390            .with_system::<&mut A>("a", system_a, &[])
391            .unwrap()
392            .with_system::<&mut B>("b", system_b, &[])
393            .unwrap()
394            .with_system::<(&mut A, &mut B)>("c", system_c, &[])
395            .unwrap()
396            .with_system::<&mut C>("cc", system_c, &["a", "b"])
397            .unwrap()
398            .with_system::<()>("ccc", system_c, &[])
399            .unwrap();
400        assert_eq!(
401            builder,
402            ParallelPipelineBuilder {
403                parallel_jobs: 8,
404                systems_pre: vec![],
405                systems_main: vec![
406                    vec![
407                        PipelineBuilderMeta {
408                            name: "a".to_owned(),
409                            system: PipelineGraphSystem {
410                                system: system_a,
411                                reads: types!(),
412                                writes: types!(A),
413                                layer: PipelineLayer::Main,
414                                lock_on_single_thread: false,
415                            },
416                        },
417                        PipelineBuilderMeta {
418                            name: "b".to_owned(),
419                            system: PipelineGraphSystem {
420                                system: system_b,
421                                reads: types!(),
422                                writes: types!(B),
423                                layer: PipelineLayer::Main,
424                                lock_on_single_thread: false,
425                            },
426                        },
427                        PipelineBuilderMeta {
428                            name: "ccc".to_owned(),
429                            system: PipelineGraphSystem {
430                                system: system_c,
431                                reads: types!(),
432                                writes: types!(),
433                                layer: PipelineLayer::Main,
434                                lock_on_single_thread: false,
435                            },
436                        },
437                    ],
438                    vec![
439                        PipelineBuilderMeta {
440                            name: "c".to_owned(),
441                            system: PipelineGraphSystem {
442                                system: system_c,
443                                reads: types!(),
444                                writes: types!(A, B),
445                                layer: PipelineLayer::Main,
446                                lock_on_single_thread: false,
447                            },
448                        },
449                        PipelineBuilderMeta {
450                            name: "cc".to_owned(),
451                            system: PipelineGraphSystem {
452                                system: system_c,
453                                reads: types!(),
454                                writes: types!(C),
455                                layer: PipelineLayer::Main,
456                                lock_on_single_thread: false,
457                            },
458                        },
459                    ],
460                ],
461                systems_post: vec![],
462            }
463        );
464        assert_eq!(
465            builder.clone().graph(),
466            PipelineGraph::Sequence(vec![
467                PipelineGraph::Parallel(vec![
468                    PipelineGraph::System(PipelineGraphSystem {
469                        system: system_a,
470                        reads: types!(),
471                        writes: types!(A),
472                        layer: PipelineLayer::Main,
473                        lock_on_single_thread: false,
474                    }),
475                    PipelineGraph::System(PipelineGraphSystem {
476                        system: system_b,
477                        reads: types!(),
478                        writes: types!(B),
479                        layer: PipelineLayer::Main,
480                        lock_on_single_thread: false,
481                    }),
482                    PipelineGraph::System(PipelineGraphSystem {
483                        system: system_c,
484                        reads: types!(),
485                        writes: types!(),
486                        layer: PipelineLayer::Main,
487                        lock_on_single_thread: false,
488                    }),
489                ]),
490                PipelineGraph::Parallel(vec![
491                    PipelineGraph::System(PipelineGraphSystem {
492                        system: system_c,
493                        reads: types!(),
494                        writes: types!(A, B),
495                        layer: PipelineLayer::Main,
496                        lock_on_single_thread: false,
497                    }),
498                    PipelineGraph::System(PipelineGraphSystem {
499                        system: system_c,
500                        reads: types!(),
501                        writes: types!(C),
502                        layer: PipelineLayer::Main,
503                        lock_on_single_thread: false,
504                    }),
505                ]),
506            ])
507        );
508        assert_eq!(
509            builder.clone().build::<SequencePipelineEngine>(),
510            SequencePipelineEngine {
511                systems: vec![system_a, system_b, system_c, system_c, system_c,],
512            }
513        );
514        assert_eq!(
515            builder.clone().build::<DefaultPipelineEngine>(),
516            DefaultPipelineEngine {
517                parallel: false,
518                graph: Some(PipelineGraph::Sequence(vec![
519                    PipelineGraph::Parallel(vec![
520                        PipelineGraph::System(PipelineGraphSystem {
521                            system: system_a,
522                            reads: types!(),
523                            writes: types!(A),
524                            layer: PipelineLayer::Main,
525                            lock_on_single_thread: false,
526                        }),
527                        PipelineGraph::System(PipelineGraphSystem {
528                            system: system_b,
529                            reads: types!(),
530                            writes: types!(B),
531                            layer: PipelineLayer::Main,
532                            lock_on_single_thread: false,
533                        }),
534                        PipelineGraph::System(PipelineGraphSystem {
535                            system: system_c,
536                            reads: types!(),
537                            writes: types!(),
538                            layer: PipelineLayer::Main,
539                            lock_on_single_thread: false,
540                        }),
541                    ]),
542                    PipelineGraph::Parallel(vec![
543                        PipelineGraph::System(PipelineGraphSystem {
544                            system: system_c,
545                            reads: types!(),
546                            writes: types!(A, B),
547                            layer: PipelineLayer::Main,
548                            lock_on_single_thread: false,
549                        }),
550                        PipelineGraph::System(PipelineGraphSystem {
551                            system: system_c,
552                            reads: types!(),
553                            writes: types!(C),
554                            layer: PipelineLayer::Main,
555                            lock_on_single_thread: false,
556                        }),
557                    ]),
558                ])),
559            }
560        );
561
562        let builder = LinearPipelineBuilder::default()
563            .with_system::<&mut A>("a", system_a, &[])
564            .unwrap()
565            .with_system::<&mut B>("b", system_b, &[])
566            .unwrap()
567            .with_system::<(&mut A, &mut B)>("c", system_c, &[])
568            .unwrap()
569            .with_system::<&mut C>("cc", system_c, &["a", "b"])
570            .unwrap()
571            .with_system::<()>("ccc", system_c, &[])
572            .unwrap();
573        assert_eq!(
574            builder,
575            LinearPipelineBuilder {
576                systems_pre: vec![],
577                systems_main: vec![
578                    PipelineBuilderMeta {
579                        name: "a".to_owned(),
580                        system: PipelineGraphSystem {
581                            system: system_a,
582                            reads: types!(),
583                            writes: types!(A),
584                            layer: PipelineLayer::Main,
585                            lock_on_single_thread: false,
586                        },
587                    },
588                    PipelineBuilderMeta {
589                        name: "b".to_owned(),
590                        system: PipelineGraphSystem {
591                            system: system_b,
592                            reads: types!(),
593                            writes: types!(B),
594                            layer: PipelineLayer::Main,
595                            lock_on_single_thread: false,
596                        },
597                    },
598                    PipelineBuilderMeta {
599                        name: "c".to_owned(),
600                        system: PipelineGraphSystem {
601                            system: system_c,
602                            reads: types!(),
603                            writes: types!(A, B),
604                            layer: PipelineLayer::Main,
605                            lock_on_single_thread: false,
606                        },
607                    },
608                    PipelineBuilderMeta {
609                        name: "cc".to_owned(),
610                        system: PipelineGraphSystem {
611                            system: system_c,
612                            reads: types!(),
613                            writes: types!(C),
614                            layer: PipelineLayer::Main,
615                            lock_on_single_thread: false,
616                        },
617                    },
618                    PipelineBuilderMeta {
619                        name: "ccc".to_owned(),
620                        system: PipelineGraphSystem {
621                            system: system_c,
622                            reads: types!(),
623                            writes: types!(),
624                            layer: PipelineLayer::Main,
625                            lock_on_single_thread: false,
626                        },
627                    },
628                ],
629                systems_post: vec![],
630            }
631        );
632        assert_eq!(
633            builder.clone().graph(),
634            PipelineGraph::Sequence(vec![
635                PipelineGraph::System(PipelineGraphSystem {
636                    system: system_a,
637                    reads: types!(),
638                    writes: types!(A),
639                    layer: PipelineLayer::Main,
640                    lock_on_single_thread: false,
641                }),
642                PipelineGraph::System(PipelineGraphSystem {
643                    system: system_b,
644                    reads: types!(),
645                    writes: types!(B),
646                    layer: PipelineLayer::Main,
647                    lock_on_single_thread: false,
648                }),
649                PipelineGraph::System(PipelineGraphSystem {
650                    system: system_c,
651                    reads: types!(),
652                    writes: types!(A, B),
653                    layer: PipelineLayer::Main,
654                    lock_on_single_thread: false,
655                }),
656                PipelineGraph::System(PipelineGraphSystem {
657                    system: system_c,
658                    reads: types!(),
659                    writes: types!(C),
660                    layer: PipelineLayer::Main,
661                    lock_on_single_thread: false,
662                }),
663                PipelineGraph::System(PipelineGraphSystem {
664                    system: system_c,
665                    reads: types!(),
666                    writes: types!(),
667                    layer: PipelineLayer::Main,
668                    lock_on_single_thread: false,
669                }),
670            ])
671        );
672        assert_eq!(
673            builder.clone().build::<SequencePipelineEngine>(),
674            SequencePipelineEngine {
675                systems: vec![system_a, system_b, system_c, system_c, system_c,],
676            }
677        );
678        assert_eq!(
679            builder.clone().build::<DefaultPipelineEngine>(),
680            DefaultPipelineEngine {
681                parallel: false,
682                graph: Some(PipelineGraph::Sequence(vec![
683                    PipelineGraph::System(PipelineGraphSystem {
684                        system: system_a,
685                        reads: types!(),
686                        writes: types!(A),
687                        layer: PipelineLayer::Main,
688                        lock_on_single_thread: false,
689                    }),
690                    PipelineGraph::System(PipelineGraphSystem {
691                        system: system_b,
692                        reads: types!(),
693                        writes: types!(B),
694                        layer: PipelineLayer::Main,
695                        lock_on_single_thread: false,
696                    }),
697                    PipelineGraph::System(PipelineGraphSystem {
698                        system: system_c,
699                        reads: types!(),
700                        writes: types!(A, B),
701                        layer: PipelineLayer::Main,
702                        lock_on_single_thread: false,
703                    }),
704                    PipelineGraph::System(PipelineGraphSystem {
705                        system: system_c,
706                        reads: types!(),
707                        writes: types!(C),
708                        layer: PipelineLayer::Main,
709                        lock_on_single_thread: false,
710                    }),
711                    PipelineGraph::System(PipelineGraphSystem {
712                        system: system_c,
713                        reads: types!(),
714                        writes: types!(),
715                        layer: PipelineLayer::Main,
716                        lock_on_single_thread: false,
717                    }),
718                ])),
719            }
720        );
721    }
722}