1use std::{
10 any::Any,
11 collections::HashMap,
12 marker::PhantomData,
13 ops::{Deref, DerefMut},
14};
15
16use broomdog::{Loan, LoanMut};
17use dagga::Dag;
18use snafu::prelude::*;
19
20pub use broomdog::{BroomdogErr, TypeKey, TypeMap};
21pub use dagga::{DaggaError, Node};
22#[cfg(feature = "derive")]
23pub use moongraph_macros::Edges;
24
25#[cfg(feature = "tutorial")]
26mod tutorial_impl;
27
28#[cfg(feature = "tutorial")]
29pub use tutorial_impl::tutorial;
30
31#[cfg(feature = "parallel")]
32pub mod rayon_impl;
33
34#[derive(Debug, Snafu)]
36pub enum GraphError {
37 #[snafu(display("Error while running local node {error}"))]
38 RunningLocalNode { error: String },
39
40 #[snafu(display("Error scheduling the graph: {source}"))]
41 Scheduling { source: dagga::DaggaError },
42
43 #[snafu(display("Resource error: {source}"))]
44 Resource { source: broomdog::BroomdogErr },
45
46 #[snafu(display("Resource '{type_name}' is loaned"))]
47 ResourceLoaned { type_name: &'static str },
48
49 #[snafu(display("Missing resource '{name}'"))]
50 Missing { name: &'static str },
51
52 #[snafu(display("Encountered local function that was not provided or already run"))]
53 MissingLocal,
54
55 #[snafu(display("Node should be trimmed"))]
56 TrimNode,
57
58 #[snafu(display("Unrecoverable error while running node: {source}"))]
59 Other {
60 source: Box<dyn std::error::Error + Send + Sync + 'static>,
61 },
62}
63
64impl From<broomdog::BroomdogErr> for GraphError {
65 fn from(source: broomdog::BroomdogErr) -> Self {
66 GraphError::Resource { source }
67 }
68}
69
70impl GraphError {
71 pub fn other(err: impl std::error::Error + Send + Sync + 'static) -> Self {
72 GraphError::Other {
73 source: Box::new(err),
74 }
75 }
76}
77
78pub fn ok() -> Result<(), GraphError> {
80 Ok(())
81}
82
83pub fn end() -> Result<(), GraphError> {
85 Err(GraphError::TrimNode)
86}
87
88pub fn err(err: impl std::error::Error + Send + Sync + 'static) -> Result<(), GraphError> {
90 Err(GraphError::other(err))
91}
92
93pub type Resource = Box<dyn Any + Send + Sync>;
94pub type FnPrepare = dyn Fn(&mut TypeMap) -> Result<Resource, GraphError>;
95pub type FnMutRun = dyn FnMut(Resource) -> Result<Resource, GraphError> + Send + Sync;
96pub type FnSave = dyn Fn(Resource, &mut TypeMap) -> Result<(), GraphError>;
97
98pub struct Function {
104 prepare: Box<FnPrepare>,
105 run: Option<Box<FnMutRun>>,
106 save: Box<FnSave>,
107}
108
109impl Function {
110 pub fn run(
112 &mut self,
113 resources: Resource,
114 local: &mut Option<impl FnOnce(Resource) -> Result<Resource, GraphError>>,
115 ) -> Result<Resource, GraphError> {
116 if let Some(f) = self.run.as_mut() {
117 (f)(resources)
118 } else {
119 let local = local.take().context(MissingLocalSnafu)?;
120 (local)(resources)
121 }
122 }
123
124 pub fn new(
126 prepare: impl Fn(&mut TypeMap) -> Result<Resource, GraphError> + 'static,
127 run: impl Fn(Resource) -> Result<Resource, GraphError> + Send + Sync + 'static,
128 save: impl Fn(Resource, &mut TypeMap) -> Result<(), GraphError> + 'static,
129 ) -> Self {
130 Function {
131 prepare: Box::new(prepare),
132 run: Some(Box::new(run)),
133 save: Box::new(save),
134 }
135 }
136}
137
138fn missing_local(_: ()) -> Result<(), GraphError> {
139 Err(GraphError::MissingLocal)
140}
141
142pub trait Edges: Sized {
153 fn reads() -> Vec<TypeKey> {
155 vec![]
156 }
157
158 fn writes() -> Vec<TypeKey> {
160 vec![]
161 }
162
163 fn moves() -> Vec<TypeKey> {
165 vec![]
166 }
167
168 fn construct(resources: &mut TypeMap) -> Result<Self, GraphError>;
170}
171
172impl Edges for () {
173 fn construct(_: &mut TypeMap) -> Result<Self, GraphError> {
174 Ok(())
175 }
176}
177
178macro_rules! impl_edges {
179 ($($t:ident),+) => {
180 impl<$($t: Edges),+> Edges for ($($t,)+) {
181 fn construct(resources: &mut TypeMap) -> Result<Self, GraphError> {
182 Ok((
183 $( $t::construct(resources)?, )+
184 ))
185 }
186
187 fn reads() -> Vec<TypeKey> {
188 vec![
189 $( $t::reads(), )+
190 ].concat()
191 }
192
193 fn writes() -> Vec<TypeKey> {
194 vec![
195 $( $t::writes(), )+
196 ].concat()
197 }
198
199 fn moves() -> Vec<TypeKey> {
200 vec![
201 $( $t::moves(), )+
202 ].concat()
203 }
204 }
205 }
206}
207
208impl_edges!(A);
209impl_edges!(A, B);
210impl_edges!(A, B, C);
211impl_edges!(A, B, C, D);
212impl_edges!(A, B, C, D, E);
213impl_edges!(A, B, C, D, E, F);
214impl_edges!(A, B, C, D, E, F, G);
215impl_edges!(A, B, C, D, E, F, G, H);
216impl_edges!(A, B, C, D, E, F, G, H, I);
217impl_edges!(A, B, C, D, E, F, G, H, I, J);
218impl_edges!(A, B, C, D, E, F, G, H, I, J, K);
219impl_edges!(A, B, C, D, E, F, G, H, I, J, K, L);
220
221pub trait NodeResults {
230 fn creates() -> Vec<TypeKey>;
232
233 fn save(self, resources: &mut TypeMap) -> Result<(), GraphError>;
236}
237
238impl NodeResults for () {
239 fn creates() -> Vec<TypeKey> {
240 vec![]
241 }
242
243 fn save(self, _: &mut TypeMap) -> Result<(), GraphError> {
244 Ok(())
245 }
246}
247
248macro_rules! impl_node_results {
249 ($(($t:ident, $n:tt)),+) => {
250 impl<$( $t : Any + Send + Sync ),+> NodeResults for ($($t,)+) {
251 fn creates() -> Vec<TypeKey> {
252 vec![$( TypeKey::new::<$t>() ),+]
253 }
254
255 fn save(self, resources: &mut TypeMap) -> Result<(), GraphError> {
256 $( let _ = resources.insert_value( self.$n ); )+
257 Ok(())
258 }
259 }
260 }
261}
262
263impl_node_results!((A, 0));
264impl_node_results!((A, 0), (B, 1));
265impl_node_results!((A, 0), (B, 1), (C, 2));
266impl_node_results!((A, 0), (B, 1), (C, 2), (D, 3));
267impl_node_results!((A, 0), (B, 1), (C, 2), (D, 3), (E, 4));
268impl_node_results!((A, 0), (B, 1), (C, 2), (D, 3), (E, 4), (F, 5));
269impl_node_results!((A, 0), (B, 1), (C, 2), (D, 3), (E, 4), (F, 5), (G, 6));
270impl_node_results!(
271 (A, 0),
272 (B, 1),
273 (C, 2),
274 (D, 3),
275 (E, 4),
276 (F, 5),
277 (G, 6),
278 (H, 7)
279);
280impl_node_results!(
281 (A, 0),
282 (B, 1),
283 (C, 2),
284 (D, 3),
285 (E, 4),
286 (F, 5),
287 (G, 6),
288 (H, 7),
289 (I, 8)
290);
291impl_node_results!(
292 (A, 0),
293 (B, 1),
294 (C, 2),
295 (D, 3),
296 (E, 4),
297 (F, 5),
298 (G, 6),
299 (H, 7),
300 (I, 8),
301 (J, 9)
302);
303impl_node_results!(
304 (A, 0),
305 (B, 1),
306 (C, 2),
307 (D, 3),
308 (E, 4),
309 (F, 5),
310 (G, 6),
311 (H, 7),
312 (I, 8),
313 (J, 9),
314 (K, 10)
315);
316impl_node_results!(
317 (A, 0),
318 (B, 1),
319 (C, 2),
320 (D, 3),
321 (E, 4),
322 (F, 5),
323 (G, 6),
324 (H, 7),
325 (I, 8),
326 (J, 9),
327 (K, 10),
328 (L, 11)
329);
330
331fn prepare<Input: Edges + Any + Send + Sync>(
332 resources: &mut TypeMap,
333) -> Result<Resource, GraphError> {
334 log::trace!("preparing input {}", std::any::type_name::<Input>());
335 let input = Input::construct(resources)?;
336 Ok(Box::new(input))
337}
338
339fn save<Output: NodeResults + Any + Send + Sync>(
340 creates: Resource,
341 resources: &mut TypeMap,
342) -> Result<(), GraphError> {
343 log::trace!("saving output {}", std::any::type_name::<Output>());
344 let creates = *creates.downcast::<Output>().unwrap();
345 creates.save(resources)
346}
347
348pub trait IsGraphNode<Input, Output> {
366 fn into_node(self) -> Node<Function, TypeKey>;
368}
369
370impl<
371 Input: Edges + Any + Send + Sync,
372 Output: NodeResults + Any + Send + Sync,
373 F: FnMut(Input) -> Result<Output, E> + Send + Sync + 'static,
374 E: Into<GraphError>,
375 > IsGraphNode<Input, Output> for F
376{
377 fn into_node(mut self) -> Node<Function, TypeKey> {
378 let prepare = Box::new(prepare::<Input>);
379 let save = Box::new(save::<Output>);
380
381 let inner = Box::new(move |resources: Resource| -> Result<Resource, GraphError> {
382 let input = *resources.downcast::<Input>().unwrap();
383 match (self)(input) {
384 Ok(creates) => Ok(Box::new(creates)),
385 Err(e) => Err(e.into()),
386 }
387 });
388 Node::new(Function {
389 prepare,
390 run: Some(inner),
391 save,
392 })
393 .with_reads(Input::reads())
394 .with_writes(Input::writes())
395 .with_moves(Input::moves())
396 .with_results(Output::creates())
397 }
398}
399
400pub struct Move<T> {
402 inner: T,
403}
404
405impl<T: Any + Send + Sync> Edges for Move<T> {
406 fn moves() -> Vec<TypeKey> {
407 vec![TypeKey::new::<T>()]
408 }
409
410 fn construct(resources: &mut TypeMap) -> Result<Self, GraphError> {
411 let key = TypeKey::new::<T>();
412 let inner_loan = resources
413 .remove(&key)
414 .context(MissingSnafu { name: key.name() })?;
415 match inner_loan.into_owned(key.name()) {
416 Ok(value) => {
417 let box_t = value.downcast::<T>().unwrap();
419 Ok(Move { inner: *box_t })
420 }
421 Err(loan) => {
422 resources.insert(key, loan);
424 let err = ResourceLoanedSnafu {
425 type_name: std::any::type_name::<T>(),
426 }
427 .build();
428 log::error!("{err}");
429 Err(err)
430 }
431 }
432 }
433}
434
435impl<T> Move<T> {
436 pub fn into(self) -> T {
438 self.inner
439 }
440}
441
442impl<T> Deref for Move<T> {
443 type Target = T;
444
445 fn deref(&self) -> &Self::Target {
446 &self.inner
447 }
448}
449
450impl<T> DerefMut for Move<T> {
451 fn deref_mut(&mut self) -> &mut Self::Target {
452 &mut self.inner
453 }
454}
455
456impl<T: std::fmt::Display> std::fmt::Display for Move<T> {
457 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
458 self.inner.fmt(f)
459 }
460}
461
462pub trait Gen<T> {
464 fn generate() -> Option<T>;
465}
466
467pub struct SomeDefault;
470
471impl<T: Default> Gen<T> for SomeDefault {
472 fn generate() -> Option<T> {
473 Some(T::default())
474 }
475}
476
477pub struct NoDefault;
479
480impl<T> Gen<T> for NoDefault {
481 fn generate() -> Option<T> {
482 None
483 }
484}
485
486pub struct View<T, G: Gen<T> = SomeDefault> {
508 inner: Loan,
509 _phantom: PhantomData<(T, G)>,
510}
511
512impl<T: Any + Send + Sync, G: Gen<T>> Deref for View<T, G> {
513 type Target = T;
514
515 fn deref(&self) -> &Self::Target {
516 self.inner.downcast_ref().unwrap()
518 }
519}
520
521impl<T: Any + Send + Sync, G: Gen<T>> Edges for View<T, G> {
522 fn reads() -> Vec<TypeKey> {
523 vec![TypeKey::new::<T>()]
524 }
525
526 fn construct(resources: &mut TypeMap) -> Result<Self, GraphError> {
527 let key = TypeKey::new::<T>();
528 let inner = match resources.loan(key).context(ResourceSnafu)? {
529 Some(inner) => inner,
530 None => {
531 let t = G::generate().context(MissingSnafu {
532 name: std::any::type_name::<T>(),
533 })?;
534 let _ = resources.insert_value(t).unwrap();
537 log::trace!("generated missing {}", std::any::type_name::<T>());
538 resources.loan(key).unwrap().unwrap()
540 }
541 };
542 Ok(View {
543 inner,
544 _phantom: PhantomData,
545 })
546 }
547}
548
549impl<T: std::fmt::Display + Any + Send + Sync, G: Gen<T>> std::fmt::Display for View<T, G> {
550 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551 let t: &T = self.inner.downcast_ref().unwrap();
552 t.fmt(f)
553 }
554}
555
556impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoIterator for &'a View<T, G>
557where
558 &'a T: IntoIterator,
559{
560 type Item = <<&'a T as IntoIterator>::IntoIter as Iterator>::Item;
561
562 type IntoIter = <&'a T as IntoIterator>::IntoIter;
563
564 fn into_iter(self) -> Self::IntoIter {
565 self.deref().into_iter()
566 }
567}
568
569pub struct ViewMut<T, G: Gen<T> = SomeDefault> {
591 inner: LoanMut,
592 _phantom: PhantomData<(T, G)>,
593}
594
595impl<T: Any + Send + Sync, G: Gen<T>> Deref for ViewMut<T, G> {
596 type Target = T;
597
598 fn deref(&self) -> &Self::Target {
599 self.inner.downcast_ref().unwrap()
601 }
602}
603
604impl<T: Any + Send + Sync, G: Gen<T>> DerefMut for ViewMut<T, G> {
605 fn deref_mut(&mut self) -> &mut Self::Target {
606 self.inner.downcast_mut().unwrap()
608 }
609}
610
611impl<T: Any + Send + Sync, G: Gen<T>> Edges for ViewMut<T, G> {
612 fn writes() -> Vec<TypeKey> {
613 vec![TypeKey::new::<T>()]
614 }
615
616 fn construct(resources: &mut TypeMap) -> Result<Self, GraphError> {
617 let key = TypeKey::new::<T>();
618 let inner = match resources.loan_mut(key).context(ResourceSnafu)? {
619 Some(inner) => inner,
620 None => {
621 let t = G::generate().context(MissingSnafu {
622 name: std::any::type_name::<T>(),
623 })?;
624 let _ = resources.insert_value(t).unwrap();
627 log::trace!("generated missing {}", std::any::type_name::<T>());
628 resources.loan_mut(key).unwrap().unwrap()
630 }
631 };
632 Ok(ViewMut {
633 inner,
634 _phantom: PhantomData,
635 })
636 }
637}
638
639impl<T: std::fmt::Display + Any + Send + Sync, G: Gen<T>> std::fmt::Display for ViewMut<T, G> {
640 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641 let t: &T = self.inner.downcast_ref().unwrap();
642 t.fmt(f)
643 }
644}
645
646impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoIterator for &'a ViewMut<T, G>
647where
648 &'a T: IntoIterator,
649{
650 type Item = <<&'a T as IntoIterator>::IntoIter as Iterator>::Item;
651
652 type IntoIter = <&'a T as IntoIterator>::IntoIter;
653
654 fn into_iter(self) -> Self::IntoIter {
655 self.deref().into_iter()
656 }
657}
658
659#[derive(Default)]
661pub struct Execution {
662 barrier: usize,
663 unscheduled: Vec<Node<Function, TypeKey>>,
664 schedule: Vec<Vec<Node<Function, TypeKey>>>,
665}
666
667impl Execution {
668 pub fn len(&self) -> usize {
670 self.unscheduled.len() + self.schedule.iter().map(|batch| batch.len()).sum::<usize>()
671 }
672
673 pub fn is_empty(&self) -> bool {
675 self.len() == 0
676 }
677}
678
679pub struct BatchResult<'graph> {
680 nodes: &'graph mut Vec<Node<Function, TypeKey>>,
681 resources: &'graph mut TypeMap,
682 results: Vec<Result<Resource, GraphError>>,
683}
684
685impl<'graph> BatchResult<'graph> {
686 pub fn save(
694 self,
695 should_trim_nodes: bool,
696 should_unify_resources: bool,
697 ) -> Result<bool, GraphError> {
698 let BatchResult {
699 nodes,
700 resources,
701 results,
702 } = self;
703 let mut trimmed_any = false;
704 let mut trimmings = vec![false; nodes.len()];
705 for ((node, output), should_trim) in nodes.iter().zip(results).zip(trimmings.iter_mut()) {
706 match output {
707 Err(GraphError::TrimNode) => {
708 *should_trim = true;
712 trimmed_any = should_trim_nodes;
713 }
714 Err(o) => {
715 log::error!("node '{}' erred: {}", node.name(), o);
717 return Err(o);
718 }
719 Ok(output) => (node.inner().save)(output, resources)?,
720 }
721 }
722
723 if trimmed_any {
724 let mut n = 0;
725 nodes.retain_mut(|_| {
726 let should_trim = trimmings[n];
727 n += 1;
728 !should_trim
729 });
730 }
731
732 if should_unify_resources {
733 resources.unify().context(ResourceSnafu)?;
734 }
735
736 Ok(trimmed_any)
737 }
738}
739
740pub struct Batch<'graph> {
741 nodes: &'graph mut Vec<Node<Function, TypeKey>>,
742 resources: &'graph mut TypeMap,
743 }
750
751impl<'a> Batch<'a> {
752 pub fn new(resources: &'a mut TypeMap, nodes: &'a mut Vec<Node<Function, TypeKey>>) -> Self {
754 Batch { resources, nodes }
755 }
756
757 #[cfg(feature = "parallel")]
758 pub fn run(
759 self,
760 local: &mut Option<impl FnOnce(Resource) -> Result<Resource, GraphError>>,
761 ) -> Result<BatchResult<'a>, GraphError> {
762 use rayon::prelude::*;
763
764 let Batch { nodes, resources } = self;
765
766 let mut local_f = None;
767 let mut inputs = vec![];
768 let mut runs = vec![];
769 for node in nodes.iter_mut() {
770 log::trace!("preparing node '{}'", node.name());
771 let input = (node.inner().prepare)(resources)?;
772 if let Some(f) = node.inner_mut().run.as_mut() {
773 inputs.push(input);
774 runs.push(f);
775 } else {
776 let f = local.take().context(MissingLocalSnafu)?;
777 local_f = Some((
778 input,
779 Box::new(f) as Box<dyn FnOnce(Resource) -> Result<Resource, GraphError>>,
780 ));
781 }
782 }
783
784 let mut results = inputs
785 .into_par_iter()
786 .zip(runs.into_par_iter())
787 .map(|(input, f)| (f)(input))
788 .collect::<Vec<_>>();
789
790 if let Some((input, f)) = local_f {
791 results.push((f)(input));
792 }
793
794 Ok(BatchResult {
795 nodes,
796 results,
797 resources,
798 })
799 }
800
801 #[cfg(not(feature = "parallel"))]
802 pub fn run(
803 self,
804 local: &mut Option<impl FnOnce(Resource) -> Result<Resource, GraphError>>,
805 ) -> Result<BatchResult<'a>, GraphError> {
806 let Batch { nodes, resources } = self;
807
808 let mut local_f = None;
809 let mut inputs = vec![];
810 let mut runs = vec![];
811 for node in nodes.iter_mut() {
812 let input = (node.inner().prepare)(resources)?;
813 if let Some(f) = node.inner_mut().run.as_mut() {
814 inputs.push(input);
815 runs.push(f);
816 } else {
817 let f = local.take().context(MissingLocalSnafu)?;
818 local_f = Some((
819 input,
820 Box::new(f) as Box<dyn FnOnce(Resource) -> Result<Resource, GraphError>>,
821 ));
822 }
823 }
824
825 let mut outputs = inputs
826 .into_iter()
827 .zip(runs.into_iter())
828 .map(|(input, f)| (f)(input))
829 .collect::<Vec<_>>();
830
831 if let Some((input, f)) = local_f {
832 outputs.push((f)(input));
833 }
834
835 Ok(BatchResult {
836 nodes,
837 results: outputs,
838 resources,
839 })
840 }
841}
842
843pub struct Batches<'graph> {
845 schedule: std::slice::IterMut<'graph, Vec<Node<Function, TypeKey>>>,
846 resources: &'graph mut TypeMap,
847}
848
849impl<'graph> Batches<'graph> {
850 pub fn set_resources(&mut self, resources: &'graph mut TypeMap) {
853 self.resources = resources;
854 }
855
856 pub fn next_batch(&mut self) -> Option<Batch> {
857 let nodes: &'graph mut Vec<_> = self.schedule.next()?;
858 let batch = Batch::new(self.resources, nodes);
859 Some(batch)
860 }
861
862 pub fn unify(&mut self) -> bool {
865 self.resources.unify().is_ok()
866 }
867
868 pub fn len(&self) -> usize {
870 self.schedule.len()
871 }
872
873 pub fn is_empty(&self) -> bool {
875 self.len() == 0
876 }
877}
878
879#[derive(Default)]
891pub struct Graph {
892 resources: TypeMap,
893 execution: Execution,
894}
895
896impl Graph {
897 pub fn _resources_mut(&mut self) -> &mut TypeMap {
898 &mut self.resources
899 }
900
901 pub fn node<Input, Output, F: IsGraphNode<Input, Output>>(f: F) -> Node<Function, TypeKey> {
920 f.into_node()
921 }
922
923 pub fn local<Input, Output>() -> Node<Function, TypeKey>
931 where
932 Input: Edges + Any + Send + Sync,
933 Output: NodeResults + Any + Send + Sync,
934 {
935 Node::new(Function {
936 prepare: Box::new(prepare::<Input>),
937 run: None,
938 save: Box::new(save::<Output>),
939 })
940 .with_reads(Input::reads())
941 .with_writes(Input::writes())
942 .with_moves(Input::moves())
943 .with_results(Output::creates())
944 }
945
946 #[deprecated(
947 since = "0.3.3",
948 note = "Ambiguous name. Replaced by `interleave_subgraph` and `add_subgraph`. Use \
949 Graph::interleave_subgraph instead as a direct replacment."
950 )]
951 pub fn merge(mut lhs: Graph, rhs: Graph) -> Graph {
955 lhs.interleave_subgraph(rhs);
956 lhs
957 }
958
959 pub fn interleave_subgraph(&mut self, mut rhs: Graph) -> &mut Self {
1004 self.unschedule();
1005 rhs.unschedule();
1006 let Graph {
1007 resources: mut rhs_resources,
1008 execution:
1009 Execution {
1010 unscheduled: rhs_nodes,
1011 barrier: _,
1012 schedule: _,
1013 },
1014 } = rhs;
1015 self.resources
1016 .extend(std::mem::take(rhs_resources.deref_mut()));
1017 let mut unscheduled: HashMap<String, Node<Function, TypeKey>> = HashMap::default();
1018 let lhs_nodes = std::mem::take(&mut self.execution.unscheduled);
1019 unscheduled.extend(
1020 lhs_nodes
1021 .into_iter()
1022 .map(|node| (node.name().to_string(), node)),
1023 );
1024 unscheduled.extend(
1025 rhs_nodes
1026 .into_iter()
1027 .map(|node| (node.name().to_string(), node)),
1028 );
1029 self.execution.unscheduled = unscheduled.into_iter().map(|v| v.1).collect();
1030 self.execution.barrier = self.execution.barrier.max(rhs.execution.barrier);
1031 self
1032 }
1033
1034 pub fn add_subgraph(&mut self, mut rhs: Graph) -> &mut Self {
1080 self.unschedule();
1081 rhs.unschedule();
1082 let Graph {
1083 resources: mut rhs_resources,
1084 execution:
1085 Execution {
1086 unscheduled: rhs_nodes,
1087 barrier: rhs_barrier,
1088 schedule: _,
1089 },
1090 } = rhs;
1091 let base_barrier = self.execution.barrier;
1092 self.execution.barrier = base_barrier + rhs_barrier;
1093 self.resources
1094 .extend(std::mem::take(rhs_resources.deref_mut()));
1095 self.execution
1096 .unscheduled
1097 .extend(rhs_nodes.into_iter().map(|node| {
1098 let barrier = node.get_barrier();
1099 node.with_barrier(base_barrier + barrier)
1100 }));
1101
1102 self
1103 }
1104
1105 pub fn with_subgraph(mut self, rhs: Graph) -> Self {
1107 self.add_subgraph(rhs);
1108 self
1109 }
1110
1111 fn unschedule(&mut self) {
1113 self.execution.unscheduled.extend(
1114 std::mem::take(&mut self.execution.schedule)
1115 .into_iter()
1116 .flatten(),
1117 );
1118 }
1119
1120 pub fn reschedule(&mut self) -> Result<(), GraphError> {
1125 log::trace!("rescheduling the render graph:");
1126 self.unschedule();
1127 let all_nodes = std::mem::take(&mut self.execution.unscheduled);
1128 let dag = all_nodes
1129 .into_iter()
1130 .fold(Dag::default(), |dag, node| dag.with_node(node));
1131 let schedule =
1132 dag.build_schedule()
1133 .map_err(|dagga::BuildScheduleError { source, mut dag }| {
1134 for node in dag.take_nodes() {
1136 self.add_node(node);
1137 }
1138 GraphError::Scheduling { source }
1139 })?;
1140 self.execution.schedule = schedule.batches;
1141 for batch in self.execution.schedule.iter_mut() {
1143 batch.sort_by(|a, b| a.name().cmp(b.name()));
1144 }
1145
1146 let batched_names = self.get_schedule_and_resources();
1147 log::trace!("{:#?}", batched_names);
1148 Ok(())
1149 }
1150
1151 pub fn get_schedule(&self) -> Vec<Vec<&str>> {
1158 self.execution
1159 .schedule
1160 .iter()
1161 .map(|batch| batch.iter().map(|node| node.name()).collect())
1162 .collect()
1163 }
1164
1165 pub fn get_schedule_and_resources(&self) -> Vec<Vec<(&str, Vec<&str>)>> {
1172 self.execution
1173 .schedule
1174 .iter()
1175 .map(|batch| {
1176 batch
1177 .iter()
1178 .map(|node| {
1179 let name = node.name();
1180 let inputs = node
1181 .all_inputs()
1182 .into_iter()
1183 .map(|key| key.name())
1184 .collect();
1185 (name, inputs)
1186 })
1187 .collect()
1188 })
1189 .collect()
1190 }
1191
1192 pub fn nodes(&self) -> impl Iterator<Item = &Node<Function, TypeKey>> {
1194 self.execution
1195 .schedule
1196 .iter()
1197 .flatten()
1198 .chain(self.execution.unscheduled.iter())
1199 }
1200
1201 pub fn nodes_mut(&mut self) -> impl Iterator<Item = &mut Node<Function, TypeKey>> {
1203 self.execution
1204 .schedule
1205 .iter_mut()
1206 .flatten()
1207 .chain(self.execution.unscheduled.iter_mut())
1208 }
1209
1210 pub fn with_nodes(self, nodes: impl IntoIterator<Item = Node<Function, TypeKey>>) -> Self {
1212 nodes.into_iter().fold(self, Self::with_node)
1213 }
1214
1215 pub fn add_node(&mut self, node: Node<Function, TypeKey>) {
1217 self.execution
1218 .unscheduled
1219 .push(node.runs_after_barrier(self.execution.barrier));
1220 }
1221
1222 pub fn get_node(&self, name: impl AsRef<str>) -> Option<&Node<Function, TypeKey>> {
1224 self.nodes().find(|&node| node.name() == name.as_ref())
1225 }
1226
1227 pub fn get_node_mut(&mut self, name: impl AsRef<str>) -> Option<&mut Node<Function, TypeKey>> {
1229 self.nodes_mut().find(|node| node.name() == name.as_ref())
1230 }
1231
1232 pub fn remove_node(&mut self, name: impl AsRef<str>) -> Option<Node<Function, TypeKey>> {
1236 self.unschedule();
1237 let mut may_index = None;
1238 for (i, node) in self.execution.unscheduled.iter().enumerate() {
1239 if node.name() == name.as_ref() {
1240 may_index = Some(i);
1241 }
1242 }
1243 if let Some(i) = may_index.take() {
1244 Some(self.execution.unscheduled.swap_remove(i))
1245 } else {
1246 None
1247 }
1248 }
1249
1250 pub fn with_node(mut self, node: Node<Function, TypeKey>) -> Self {
1252 self.add_node(node);
1253 self
1254 }
1255
1256 pub fn with_function<Input, Output>(
1258 mut self,
1259 name: impl Into<String>,
1260 f: impl IsGraphNode<Input, Output>,
1261 ) -> Self {
1262 self.add_function(name, f);
1263 self
1264 }
1265
1266 pub fn add_function<Input, Output>(
1268 &mut self,
1269 name: impl Into<String>,
1270 f: impl IsGraphNode<Input, Output>,
1271 ) {
1272 self.add_node(f.into_node().with_name(name));
1273 }
1274
1275 pub fn contains_node(&self, name: impl AsRef<str>) -> bool {
1277 let name = name.as_ref();
1278 let search = |node: &Node<Function, TypeKey>| node.name() == name;
1279 if self.execution.unscheduled.iter().any(search) {
1280 return true;
1281 }
1282 self.execution.schedule.iter().flatten().any(search)
1283 }
1284
1285 pub fn contains_resource<T: Any + Send + Sync>(&self) -> bool {
1288 let key = TypeKey::new::<T>();
1289 self.resources.contains_key(&key)
1290 }
1291
1292 pub fn with_resource<T: Any + Send + Sync>(mut self, t: T) -> Self {
1296 self.add_resource(t);
1297 self
1298 }
1299
1300 pub fn add_resource<T: Any + Send + Sync>(&mut self, t: T) {
1304 self.resources.insert_value(t).unwrap();
1306 }
1307
1308 pub fn add_barrier(&mut self) {
1313 self.execution.barrier += 1;
1314 }
1315
1316 pub fn with_barrier(mut self) -> Self {
1321 self.add_barrier();
1322 self
1323 }
1324
1325 pub fn get_barrier(&self) -> usize {
1329 self.execution.barrier
1330 }
1331
1332 pub fn add_local<Input, Output>(&mut self, name: impl Into<String>)
1340 where
1341 Input: Edges + Any + Send + Sync,
1342 Output: NodeResults + Any + Send + Sync,
1343 {
1344 self.add_node(Self::local::<Input, Output>().with_name(name));
1345 }
1346
1347 pub fn with_local<Input, Output>(mut self, name: impl Into<String>) -> Self
1348 where
1349 Input: Edges + Any + Send + Sync,
1350 Output: NodeResults + Any + Send + Sync,
1351 {
1352 self.add_local::<Input, Output>(name);
1353 self
1354 }
1355
1356 pub fn reschedule_if_necessary(&mut self) -> Result<(), GraphError> {
1360 if !self.execution.unscheduled.is_empty() {
1361 self.reschedule()?;
1362 }
1363 Ok(())
1364 }
1365
1366 pub fn batches(&mut self) -> Batches {
1370 Batches {
1371 schedule: self.execution.schedule.iter_mut(),
1372 resources: &mut self.resources,
1373 }
1374 }
1375
1376 pub fn run(&mut self) -> Result<(), GraphError> {
1378 self.run_with_local(missing_local)
1379 }
1380
1381 pub fn run_with_local<Input, Output, E>(
1383 &mut self,
1384 f: impl FnOnce(Input) -> Result<Output, E>,
1385 ) -> Result<(), GraphError>
1386 where
1387 Input: Edges + Any + Send + Sync,
1388 Output: NodeResults + Any + Send + Sync,
1389 E: ToString,
1390 {
1391 let mut local = Some(Box::new(move |resources: Resource| {
1392 let input = *resources.downcast::<Input>().unwrap();
1393 match (f)(input) {
1394 Ok(creates) => Ok(Box::new(creates) as Resource),
1395 Err(e) => Err(GraphError::RunningLocalNode {
1396 error: e.to_string(),
1397 }),
1398 }
1399 })
1400 as Box<dyn FnOnce(Resource) -> Result<Resource, GraphError>>);
1401
1402 self.reschedule_if_necessary()?;
1403
1404 let mut got_trimmed = false;
1405 let mut batches = self.batches();
1406 while let Some(batch) = batches.next_batch() {
1407 let batch_result = batch.run(&mut local)?;
1408 let did_trim_batch = batch_result.save(true, true)?;
1409 got_trimmed = got_trimmed || did_trim_batch;
1410 }
1411 if got_trimmed {
1412 self.reschedule()?;
1413 }
1414
1415 Ok(())
1416 }
1417
1418 pub fn remove_resource<T: Any + Send + Sync>(&mut self) -> Result<Option<T>, GraphError> {
1422 let key = TypeKey::new::<T>();
1423 if let Some(inner_loan) = self.resources.remove(&key) {
1424 match inner_loan.into_owned(key.name()) {
1425 Ok(value) => {
1426 let box_t = value.downcast::<T>().unwrap();
1429 Ok(Some(*box_t))
1430 }
1431 Err(loan) => {
1432 self.resources.insert(key, loan);
1433 let err = ResourceLoanedSnafu {
1434 type_name: std::any::type_name::<T>(),
1435 }
1436 .build();
1437 log::error!("{err}");
1438 Err(err)
1439 }
1440 }
1441 } else {
1442 Ok(None)
1444 }
1445 }
1446
1447 pub fn get_resource<T: Any + Send + Sync>(&self) -> Result<Option<&T>, GraphError> {
1451 self.resources.get_value().context(ResourceSnafu)
1452 }
1453
1454 pub fn get_resource_mut<T: Any + Send + Sync>(&mut self) -> Result<Option<&mut T>, GraphError> {
1458 self.resources.get_value_mut().context(ResourceSnafu)
1459 }
1460
1461 pub fn visit<T: Edges, S>(&mut self, f: impl FnOnce(T) -> S) -> Result<S, GraphError> {
1510 let t = T::construct(&mut self.resources)?;
1511 let s = f(t);
1512 self.resources.unify().context(ResourceSnafu)?;
1513 Ok(s)
1514 }
1515
1516 pub fn into_parts(self) -> (Execution, TypeMap) {
1518 (self.execution, self.resources)
1519 }
1520
1521 #[cfg(feature = "dot")]
1522 pub fn save_graph_dot(&self, path: &str) {
1525 use dagga::dot::DagLegend;
1526
1527 let legend =
1528 DagLegend::new(self.nodes()).with_resources_named(|ty: &TypeKey| ty.name().to_string());
1529 legend.save_to(path).unwrap();
1530 }
1531
1532 pub fn _add_node_constraint(
1534 constraint: &str,
1535 i: &mut Node<Function, TypeKey>,
1536 j: Option<String>,
1537 ) {
1538 match constraint {
1540 ">" => {
1541 i.add_runs_after(j.unwrap());
1542 }
1543 "<" => {
1544 i.add_runs_before(j.unwrap());
1545 }
1546 _ => {}
1547 }
1548 }
1549
1550 pub fn _last_node(&self) -> Option<String> {
1551 self.execution
1552 .unscheduled
1553 .last()
1554 .map(|node| node.name().to_string())
1555 }
1556
1557 pub fn node_len(&self) -> usize {
1558 self.execution.len()
1559 }
1560
1561 pub fn take_next_batch_of_nodes(&mut self) -> Option<Vec<Node<Function, TypeKey>>> {
1565 if self.execution.schedule.is_empty() {
1566 None
1567 } else {
1568 self.execution.schedule.drain(0..1).next()
1569 }
1570 }
1571}
1572
1573#[macro_export]
1606macro_rules! graph {
1607 ($i:ident $op:tt $($tail:tt)*) => {{
1608 let mut g = graph!($($tail)*);
1609 let tail = g._last_node();
1610 if let Some(node) = g.get_node_mut(stringify!($i)) {
1611 Graph::_add_node_constraint(stringify!($op), node, tail);
1612 } else {
1613 g.add_node({
1614 let mut node = Graph::node($i).with_name(stringify!($i));
1615 Graph::_add_node_constraint(stringify!($op), &mut node, tail);
1616 node
1617 });
1618 }
1619 g
1620 }};
1621
1622 ($i:ident$(,)?) => {
1623 Graph::default().with_node(Graph::node($i).with_name(stringify!($i)))
1624 }
1625}
1626
1627#[cfg(test)]
1628mod test {
1629 use super::*;
1630
1631 fn create(_: ()) -> Result<(usize,), GraphError> {
1632 Ok((0,))
1633 }
1634
1635 fn edit((mut num,): (ViewMut<usize>,)) -> Result<(), GraphError> {
1636 *num += 1;
1637 Ok(())
1638 }
1639
1640 fn finish((num,): (Move<usize>,)) -> Result<(), GraphError> {
1641 assert_eq!(1, num.into(), "edit did not run");
1642 Ok(())
1643 }
1644
1645 #[test]
1646 fn function_to_node() {
1647 let mut graph = Graph::default()
1649 .with_function("create", create)
1650 .with_function("edit", edit);
1651 graph.run().unwrap();
1652 assert_eq!(1, *graph.get_resource::<usize>().unwrap().unwrap());
1653
1654 let mut graph = graph.with_function("finish", finish);
1655 graph.run().unwrap();
1656 assert!(graph.get_resource::<usize>().unwrap().is_none());
1657 }
1658
1659 #[test]
1660 fn many_inputs_many_outputs() {
1661 use crate as moongraph;
1662 fn start(_: ()) -> Result<(usize, u32, f32, f64, &'static str, String), GraphError> {
1664 Ok((0, 0, 0.0, 0.0, "hello", "HELLO".into()))
1665 }
1666
1667 fn modify_ints(
1668 (mut numusize, mut numu32): (ViewMut<usize>, ViewMut<u32>),
1669 ) -> Result<(), GraphError> {
1670 *numusize += 1;
1671 *numu32 += 1;
1672 Ok(())
1673 }
1674
1675 fn modify_floats(
1676 (mut numf32, mut numf64): (ViewMut<f32>, ViewMut<f64>),
1677 ) -> Result<(), GraphError> {
1678 *numf32 += 10.0;
1679 *numf64 += 10.0;
1680 Ok(())
1681 }
1682
1683 fn modify_strings(
1684 (mut strstatic, mut strowned): (ViewMut<&'static str>, ViewMut<String>),
1685 ) -> Result<(), GraphError> {
1686 *strstatic = "goodbye";
1687 *strowned = "GOODBYE".into();
1688 Ok(())
1689 }
1690
1691 #[derive(Edges)]
1692 struct EndData(
1693 Move<usize>,
1694 Move<u32>,
1695 Move<f32>,
1696 Move<f64>,
1697 Move<&'static str>,
1698 Move<String>,
1699 );
1700
1701 fn end(
1702 EndData(nusize, nu32, nf32, nf64, sstatic, sowned): EndData,
1703 ) -> Result<(bool,), GraphError> {
1704 assert_eq!(1, *nusize);
1705 assert_eq!(1, *nu32);
1706 assert_eq!(10.0, *nf32);
1707 assert_eq!(10.0, *nf64);
1708 assert_eq!("goodbye", *sstatic);
1709 assert_eq!("GOODBYE", *sowned);
1710 Ok((true,))
1711 }
1712
1713 let mut graph = Graph::default()
1714 .with_function("start", start)
1715 .with_function("modify_ints", modify_ints)
1716 .with_function("modify_floats", modify_floats)
1717 .with_function("modify_strings", modify_strings)
1718 .with_function("end", end);
1719
1720 graph.reschedule().unwrap();
1721 let schedule = graph.get_schedule();
1722 assert_eq!(
1723 vec![
1724 vec!["start"],
1725 vec!["modify_floats", "modify_ints", "modify_strings"],
1726 vec!["end"]
1727 ],
1728 schedule,
1729 "schedule is wrong"
1730 );
1731
1732 graph.run().unwrap();
1733 let run_was_all_good = graph.get_resource::<bool>().unwrap().unwrap();
1734 assert!(run_was_all_good, "run was not all good");
1735 }
1736
1737 #[test]
1738 fn can_derive() {
1739 #[derive(Edges)]
1740 #[moongraph(crate = crate)]
1741 struct Input {
1742 num_usize: View<usize>,
1743 num_f32: ViewMut<f32>,
1744 num_f64: Move<f64>,
1745 }
1746
1747 type Output = (String, &'static str);
1748
1749 fn start(_: ()) -> Result<(usize, f32, f64), GraphError> {
1750 Ok((1, 0.0, 10.0))
1751 }
1752
1753 fn end(mut input: Input) -> Result<Output, GraphError> {
1754 *input.num_f32 += *input.num_f64 as f32;
1755 Ok((
1756 format!("{},{},{}", *input.num_usize, *input.num_f32, *input.num_f64),
1757 "done",
1758 ))
1759 }
1760
1761 let mut graph = Graph::default()
1762 .with_function("start", start)
1763 .with_function("end", end);
1764 graph.run().unwrap();
1765 assert_eq!(
1766 "1,10,10",
1767 graph.get_resource::<String>().unwrap().unwrap().as_str()
1768 );
1769 }
1770
1771 #[test]
1772 fn can_visit_and_then_borrow() {
1773 use crate as moongraph;
1774
1775 #[derive(Edges)]
1776 struct Input {
1777 num_usize: View<usize>,
1778 num_f32: ViewMut<f32>,
1779 num_f64: Move<f64>,
1780 }
1781
1782 let mut graph = Graph::default()
1783 .with_resource(0usize)
1784 .with_resource(0.0f32)
1785 .with_resource(0.0f64);
1786 let num_usize = graph
1787 .visit(|mut input: Input| {
1788 *input.num_f32 = 666.0;
1789 *input.num_f64 += 10.0;
1790 *input.num_usize
1791 })
1792 .unwrap();
1793 assert_eq!(0, num_usize);
1794 assert_eq!(0, *graph.get_resource::<usize>().unwrap().unwrap());
1795 assert_eq!(666.0, *graph.get_resource::<f32>().unwrap().unwrap());
1796 assert!(!graph.contains_resource::<f64>());
1797 }
1798
1799 #[cfg(feature = "none")]
1800 #[test]
1801 fn can_run_local() {
1802 fn start(_: ()) -> Result<(usize, u32, f32, f64, &'static str, String), GraphError> {
1803 Ok((0, 0, 0.0, 0.0, "hello", "HELLO".into()))
1804 }
1805
1806 fn modify_ints(
1807 (mut numusize, mut numu32): (ViewMut<usize>, ViewMut<u32>),
1808 ) -> Result<(), GraphError> {
1809 *numusize += 1;
1810 *numu32 += 1;
1811 Ok(())
1812 }
1813
1814 fn modify_floats(
1815 (mut numf32, mut numf64): (ViewMut<f32>, ViewMut<f64>),
1816 ) -> Result<(), GraphError> {
1817 *numf32 += 10.0;
1818 *numf64 += 10.0;
1819 Ok(())
1820 }
1821
1822 fn modify_strings(
1823 (mut strstatic, mut strowned): (ViewMut<&'static str>, ViewMut<String>),
1824 ) -> Result<(), GraphError> {
1825 *strstatic = "goodbye";
1826 *strowned = "GOODBYE".into();
1827 Ok(())
1828 }
1829
1830 fn end(
1831 (nusize, nu32, nf32, nf64, sstatic, sowned): (
1832 Move<usize>,
1833 Move<u32>,
1834 Move<f32>,
1835 Move<f64>,
1836 Move<&'static str>,
1837 Move<String>,
1838 ),
1839 ) -> Result<(bool,), GraphError> {
1840 assert_eq!(1, *nusize);
1841 assert_eq!(10, *nu32);
1842 assert_eq!(100.0, *nf32);
1843 assert_eq!(10.0, *nf64);
1844 assert_eq!("goodbye", *sstatic);
1845 assert_eq!("GOODBYE", *sowned);
1846 Ok((true,))
1847 }
1848
1849 let mut graph = graph!(start, modify_ints, modify_floats, modify_strings, end,)
1850 .with_local::<(ViewMut<u32>, ViewMut<f32>), ()>("local");
1851
1852 graph.reschedule().unwrap();
1853 assert_eq!(
1854 vec![
1855 vec!["start"],
1856 vec!["modify_strings", "modify_floats", "modify_ints"],
1857 vec!["local"],
1858 vec!["end"]
1859 ],
1860 graph.get_schedule(),
1861 "schedule is wrong"
1862 );
1863
1864 let mut my_num = 0.0;
1865 graph
1866 .run_with_local(
1867 |(mut nu32, mut nf32): (ViewMut<u32>, ViewMut<f32>)| -> Result<(), String> {
1868 *nu32 *= 10;
1869 *nf32 *= 10.0;
1870 my_num = *nu32 as f32 + *nf32;
1871 Ok(())
1872 },
1873 )
1874 .unwrap();
1875 let run_was_all_good = graph.get_resource::<bool>().unwrap().unwrap();
1876 assert!(run_was_all_good, "run was not all good");
1877 assert_eq!(110.0, my_num, "local did not run");
1878 }
1879
1880 #[test]
1881 fn can_generate_view_default() {
1884 let mut graph = Graph::default();
1885 let u = graph.visit(|u: View<usize>| *u).unwrap();
1886 assert_eq!(0, u);
1887
1888 let my_u = graph.get_resource::<usize>().unwrap();
1889 assert_eq!(Some(0), my_u.copied());
1890 }
1891}