moongraph/
lib.rs

1//! DAG scheduling, resource management, and execution.
2//!
3//! In `moongraph`, nodes are functions with parameters that are accessed
4//! immutably, mutably or by move.
5//!
6//! `moongraph` validates and schedules nodes to run in parallel where possible,
7//! using `rayon` as the underlying parallelizing tech.
8
9use std::{
10    any::Any,
11    collections::HashMap,
12    marker::PhantomData,
13    ops::{Deref, DerefMut},
14};
15
16use broomdog::{Loan, LoanMut};
17use dagga::Dag;
18use snafu::prelude::*;
19
20pub use broomdog::{BroomdogErr, TypeKey, TypeMap};
21pub use dagga::{DaggaError, Node};
22#[cfg(feature = "derive")]
23pub use moongraph_macros::Edges;
24
25#[cfg(feature = "tutorial")]
26mod tutorial_impl;
27
28#[cfg(feature = "tutorial")]
29pub use tutorial_impl::tutorial;
30
31#[cfg(feature = "parallel")]
32pub mod rayon_impl;
33
34/// All errors.
35#[derive(Debug, Snafu)]
36pub enum GraphError {
37    #[snafu(display("Error while running local node {error}"))]
38    RunningLocalNode { error: String },
39
40    #[snafu(display("Error scheduling the graph: {source}"))]
41    Scheduling { source: dagga::DaggaError },
42
43    #[snafu(display("Resource error: {source}"))]
44    Resource { source: broomdog::BroomdogErr },
45
46    #[snafu(display("Resource '{type_name}' is loaned"))]
47    ResourceLoaned { type_name: &'static str },
48
49    #[snafu(display("Missing resource '{name}'"))]
50    Missing { name: &'static str },
51
52    #[snafu(display("Encountered local function that was not provided or already run"))]
53    MissingLocal,
54
55    #[snafu(display("Node should be trimmed"))]
56    TrimNode,
57
58    #[snafu(display("Unrecoverable error while running node: {source}"))]
59    Other {
60        source: Box<dyn std::error::Error + Send + Sync + 'static>,
61    },
62}
63
64impl From<broomdog::BroomdogErr> for GraphError {
65    fn from(source: broomdog::BroomdogErr) -> Self {
66        GraphError::Resource { source }
67    }
68}
69
70impl GraphError {
71    pub fn other(err: impl std::error::Error + Send + Sync + 'static) -> Self {
72        GraphError::Other {
73            source: Box::new(err),
74        }
75    }
76}
77
78/// Returns a result meaning everything is ok and the node should run again next frame.
79pub fn ok() -> Result<(), GraphError> {
80    Ok(())
81}
82
83/// Returns a result meaning everything is ok, but the node should be removed from the graph.
84pub fn end() -> Result<(), GraphError> {
85    Err(GraphError::TrimNode)
86}
87
88/// Returns a result meaning an error occured and the graph cannot recover.
89pub fn err(err: impl std::error::Error + Send + Sync + 'static) -> Result<(), GraphError> {
90    Err(GraphError::other(err))
91}
92
93pub type Resource = Box<dyn Any + Send + Sync>;
94pub type FnPrepare = dyn Fn(&mut TypeMap) -> Result<Resource, GraphError>;
95pub type FnMutRun = dyn FnMut(Resource) -> Result<Resource, GraphError> + Send + Sync;
96pub type FnSave = dyn Fn(Resource, &mut TypeMap) -> Result<(), GraphError>;
97
98/// A function wrapper.
99///
100/// Wraps a function by moving it into a closure. Before running, the parameters
101/// of the function are constructed from a TypeMap of resources. The results
102/// of the function are packed back into the same TypeMap.
103pub struct Function {
104    prepare: Box<FnPrepare>,
105    run: Option<Box<FnMutRun>>,
106    save: Box<FnSave>,
107}
108
109impl Function {
110    /// Run the function using the given `TypeMap`.
111    pub fn run(
112        &mut self,
113        resources: Resource,
114        local: &mut Option<impl FnOnce(Resource) -> Result<Resource, GraphError>>,
115    ) -> Result<Resource, GraphError> {
116        if let Some(f) = self.run.as_mut() {
117            (f)(resources)
118        } else {
119            let local = local.take().context(MissingLocalSnafu)?;
120            (local)(resources)
121        }
122    }
123
124    /// Create a new function.
125    pub fn new(
126        prepare: impl Fn(&mut TypeMap) -> Result<Resource, GraphError> + 'static,
127        run: impl Fn(Resource) -> Result<Resource, GraphError> + Send + Sync + 'static,
128        save: impl Fn(Resource, &mut TypeMap) -> Result<(), GraphError> + 'static,
129    ) -> Self {
130        Function {
131            prepare: Box::new(prepare),
132            run: Some(Box::new(run)),
133            save: Box::new(save),
134        }
135    }
136}
137
138fn missing_local(_: ()) -> Result<(), GraphError> {
139    Err(GraphError::MissingLocal)
140}
141
142/// Trait for describing types that are made up of graph edges (ie resources).
143///
144/// Graph edges are the _resources_ that graph nodes (ie functions) consume.
145///
146/// The `Edges` trait allows the library user to construct types that use
147/// resources. This is convenient when the number of resources becomes large
148/// and using a tuple becomes unwieldy.
149///
150/// There exists a derive macro [`Edges`](derive@Edges) to help implementing
151/// this trait.
152pub trait Edges: Sized {
153    /// Keys of all read types used in fields in the implementor.
154    fn reads() -> Vec<TypeKey> {
155        vec![]
156    }
157
158    /// Keys of all write types used in fields in the implementor.
159    fn writes() -> Vec<TypeKey> {
160        vec![]
161    }
162
163    /// Keys of all move types used in fields in the implementor.
164    fn moves() -> Vec<TypeKey> {
165        vec![]
166    }
167
168    /// Attempt to construct the implementor from the given `TypeMap`.
169    fn construct(resources: &mut TypeMap) -> Result<Self, GraphError>;
170}
171
172impl Edges for () {
173    fn construct(_: &mut TypeMap) -> Result<Self, GraphError> {
174        Ok(())
175    }
176}
177
178macro_rules! impl_edges {
179    ($($t:ident),+) => {
180        impl<$($t: Edges),+> Edges for ($($t,)+) {
181            fn construct(resources: &mut TypeMap) -> Result<Self, GraphError> {
182                Ok((
183                    $( $t::construct(resources)?, )+
184                ))
185            }
186
187            fn reads() -> Vec<TypeKey> {
188                vec![
189                    $( $t::reads(), )+
190                ].concat()
191            }
192
193            fn writes() -> Vec<TypeKey> {
194                vec![
195                    $( $t::writes(), )+
196                ].concat()
197            }
198
199            fn moves() -> Vec<TypeKey> {
200                vec![
201                    $( $t::moves(), )+
202                ].concat()
203            }
204        }
205    }
206}
207
208impl_edges!(A);
209impl_edges!(A, B);
210impl_edges!(A, B, C);
211impl_edges!(A, B, C, D);
212impl_edges!(A, B, C, D, E);
213impl_edges!(A, B, C, D, E, F);
214impl_edges!(A, B, C, D, E, F, G);
215impl_edges!(A, B, C, D, E, F, G, H);
216impl_edges!(A, B, C, D, E, F, G, H, I);
217impl_edges!(A, B, C, D, E, F, G, H, I, J);
218impl_edges!(A, B, C, D, E, F, G, H, I, J, K);
219impl_edges!(A, B, C, D, E, F, G, H, I, J, K, L);
220
221/// Trait for describing types that are the result of running a node.
222///
223/// When a node runs it may result in the creation of graph edges (ie
224/// resources). Graph edges are the _resources_ that other nodes (ie functions)
225/// consume.
226///
227/// The `NodeResults` trait allows the library user to emit tuples of resources
228/// that will then be stored in the graph for downstream nodes to use as input.
229pub trait NodeResults {
230    /// All keys of types/resources created.
231    fn creates() -> Vec<TypeKey>;
232
233    /// Attempt to pack the implementor's constituent resources into the given
234    /// `TypeMap`.
235    fn save(self, resources: &mut TypeMap) -> Result<(), GraphError>;
236}
237
238impl NodeResults for () {
239    fn creates() -> Vec<TypeKey> {
240        vec![]
241    }
242
243    fn save(self, _: &mut TypeMap) -> Result<(), GraphError> {
244        Ok(())
245    }
246}
247
248macro_rules! impl_node_results {
249    ($(($t:ident, $n:tt)),+) => {
250        impl<$( $t : Any + Send + Sync ),+> NodeResults for ($($t,)+) {
251            fn creates() -> Vec<TypeKey> {
252                vec![$( TypeKey::new::<$t>() ),+]
253            }
254
255            fn save(self, resources: &mut TypeMap) -> Result<(), GraphError> {
256                $( let _ = resources.insert_value( self.$n ); )+
257                Ok(())
258            }
259        }
260    }
261}
262
263impl_node_results!((A, 0));
264impl_node_results!((A, 0), (B, 1));
265impl_node_results!((A, 0), (B, 1), (C, 2));
266impl_node_results!((A, 0), (B, 1), (C, 2), (D, 3));
267impl_node_results!((A, 0), (B, 1), (C, 2), (D, 3), (E, 4));
268impl_node_results!((A, 0), (B, 1), (C, 2), (D, 3), (E, 4), (F, 5));
269impl_node_results!((A, 0), (B, 1), (C, 2), (D, 3), (E, 4), (F, 5), (G, 6));
270impl_node_results!(
271    (A, 0),
272    (B, 1),
273    (C, 2),
274    (D, 3),
275    (E, 4),
276    (F, 5),
277    (G, 6),
278    (H, 7)
279);
280impl_node_results!(
281    (A, 0),
282    (B, 1),
283    (C, 2),
284    (D, 3),
285    (E, 4),
286    (F, 5),
287    (G, 6),
288    (H, 7),
289    (I, 8)
290);
291impl_node_results!(
292    (A, 0),
293    (B, 1),
294    (C, 2),
295    (D, 3),
296    (E, 4),
297    (F, 5),
298    (G, 6),
299    (H, 7),
300    (I, 8),
301    (J, 9)
302);
303impl_node_results!(
304    (A, 0),
305    (B, 1),
306    (C, 2),
307    (D, 3),
308    (E, 4),
309    (F, 5),
310    (G, 6),
311    (H, 7),
312    (I, 8),
313    (J, 9),
314    (K, 10)
315);
316impl_node_results!(
317    (A, 0),
318    (B, 1),
319    (C, 2),
320    (D, 3),
321    (E, 4),
322    (F, 5),
323    (G, 6),
324    (H, 7),
325    (I, 8),
326    (J, 9),
327    (K, 10),
328    (L, 11)
329);
330
331fn prepare<Input: Edges + Any + Send + Sync>(
332    resources: &mut TypeMap,
333) -> Result<Resource, GraphError> {
334    log::trace!("preparing input {}", std::any::type_name::<Input>());
335    let input = Input::construct(resources)?;
336    Ok(Box::new(input))
337}
338
339fn save<Output: NodeResults + Any + Send + Sync>(
340    creates: Resource,
341    resources: &mut TypeMap,
342) -> Result<(), GraphError> {
343    log::trace!("saving output {}", std::any::type_name::<Output>());
344    let creates = *creates.downcast::<Output>().unwrap();
345    creates.save(resources)
346}
347
348/// Defines graph nodes.
349///
350/// A node in the graph is a boxed Rust closure that may do any or all the
351/// following:
352///
353/// * Create resources by returning a result that implements [`NodeResults`].
354/// * Consume one or more resources by having a field in the input parameter
355///   wrapped in [`Move`]. The resource will not be available in the graph after
356///   the node is run.
357/// * Read one or more resources by having a field in the input parameter
358///   wrapped in [`View`].
359/// * Write one or more resources by having a field in the input parameter
360///   wrapped in [`ViewMut`].
361///
362/// By default `IsGraphNode` is implemented for functions that take one
363/// parameter implementing [`Edges`] and returning a `Result` where the "ok"
364/// type implements `NodeResults`.
365pub trait IsGraphNode<Input, Output> {
366    /// Convert the implementor into a `Node`.
367    fn into_node(self) -> Node<Function, TypeKey>;
368}
369
370impl<
371        Input: Edges + Any + Send + Sync,
372        Output: NodeResults + Any + Send + Sync,
373        F: FnMut(Input) -> Result<Output, E> + Send + Sync + 'static,
374        E: Into<GraphError>,
375    > IsGraphNode<Input, Output> for F
376{
377    fn into_node(mut self) -> Node<Function, TypeKey> {
378        let prepare = Box::new(prepare::<Input>);
379        let save = Box::new(save::<Output>);
380
381        let inner = Box::new(move |resources: Resource| -> Result<Resource, GraphError> {
382            let input = *resources.downcast::<Input>().unwrap();
383            match (self)(input) {
384                Ok(creates) => Ok(Box::new(creates)),
385                Err(e) => Err(e.into()),
386            }
387        });
388        Node::new(Function {
389            prepare,
390            run: Some(inner),
391            save,
392        })
393        .with_reads(Input::reads())
394        .with_writes(Input::writes())
395        .with_moves(Input::moves())
396        .with_results(Output::creates())
397    }
398}
399
400/// Specifies a graph edge/resource that is "moved" by a node.
401pub struct Move<T> {
402    inner: T,
403}
404
405impl<T: Any + Send + Sync> Edges for Move<T> {
406    fn moves() -> Vec<TypeKey> {
407        vec![TypeKey::new::<T>()]
408    }
409
410    fn construct(resources: &mut TypeMap) -> Result<Self, GraphError> {
411        let key = TypeKey::new::<T>();
412        let inner_loan = resources
413            .remove(&key)
414            .context(MissingSnafu { name: key.name() })?;
415        match inner_loan.into_owned(key.name()) {
416            Ok(value) => {
417                // UNWRAP: safe because we got this out as `T`
418                let box_t = value.downcast::<T>().unwrap();
419                Ok(Move { inner: *box_t })
420            }
421            Err(loan) => {
422                // We really do **not** want to lose any resources
423                resources.insert(key, loan);
424                let err = ResourceLoanedSnafu {
425                    type_name: std::any::type_name::<T>(),
426                }
427                .build();
428                log::error!("{err}");
429                Err(err)
430            }
431        }
432    }
433}
434
435impl<T> Move<T> {
436    /// Convert into its inner type.
437    pub fn into(self) -> T {
438        self.inner
439    }
440}
441
442impl<T> Deref for Move<T> {
443    type Target = T;
444
445    fn deref(&self) -> &Self::Target {
446        &self.inner
447    }
448}
449
450impl<T> DerefMut for Move<T> {
451    fn deref_mut(&mut self) -> &mut Self::Target {
452        &mut self.inner
453    }
454}
455
456impl<T: std::fmt::Display> std::fmt::Display for Move<T> {
457    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
458        self.inner.fmt(f)
459    }
460}
461
462/// Used to generate a default value of a resource, if possible.
463pub trait Gen<T> {
464    fn generate() -> Option<T>;
465}
466
467/// Valueless type that represents the ability to generate a resource by
468/// default.
469pub struct SomeDefault;
470
471impl<T: Default> Gen<T> for SomeDefault {
472    fn generate() -> Option<T> {
473        Some(T::default())
474    }
475}
476
477/// Valueless type that represents the **inability** to generate a resource by default.
478pub struct NoDefault;
479
480impl<T> Gen<T> for NoDefault {
481    fn generate() -> Option<T> {
482        None
483    }
484}
485
486/// Immutably borrowed resource that _may_ be created by default.
487///
488/// Node functions wrap their parameters in [`View`], [`ViewMut`] or [`Move`].
489///
490/// `View` has two type parameters:
491/// * `T` - The type of the resource.
492/// * `G` - The method by which the resource can be generated if it doesn't
493///   already exist. By default this is [`SomeDefault`], which denotes creating the
494///   resource using its default instance. Another option is [`NoDefault`] which
495///   fails to generate the resource.
496///
497/// ```rust
498/// use moongraph::*;
499///
500/// let mut graph = Graph::default();
501/// let default_number = graph.visit(|u: View<usize>| { *u }).map_err(|e| e.to_string());
502/// assert_eq!(Ok(0), default_number);
503///
504/// let no_number = graph.visit(|f: View<f32, NoDefault>| *f);
505/// assert!(no_number.is_err());
506/// ```
507pub struct View<T, G: Gen<T> = SomeDefault> {
508    inner: Loan,
509    _phantom: PhantomData<(T, G)>,
510}
511
512impl<T: Any + Send + Sync, G: Gen<T>> Deref for View<T, G> {
513    type Target = T;
514
515    fn deref(&self) -> &Self::Target {
516        // UNWRAP: safe because it was constructed with `T`
517        self.inner.downcast_ref().unwrap()
518    }
519}
520
521impl<T: Any + Send + Sync, G: Gen<T>> Edges for View<T, G> {
522    fn reads() -> Vec<TypeKey> {
523        vec![TypeKey::new::<T>()]
524    }
525
526    fn construct(resources: &mut TypeMap) -> Result<Self, GraphError> {
527        let key = TypeKey::new::<T>();
528        let inner = match resources.loan(key).context(ResourceSnafu)? {
529            Some(inner) => inner,
530            None => {
531                let t = G::generate().context(MissingSnafu {
532                    name: std::any::type_name::<T>(),
533                })?;
534                // UNWRAP: safe because we know this type was missing, and no other type
535                // is stored with this type's type id.
536                let _ = resources.insert_value(t).unwrap();
537                log::trace!("generated missing {}", std::any::type_name::<T>());
538                // UNWRAP: safe because we just inserted
539                resources.loan(key).unwrap().unwrap()
540            }
541        };
542        Ok(View {
543            inner,
544            _phantom: PhantomData,
545        })
546    }
547}
548
549impl<T: std::fmt::Display + Any + Send + Sync, G: Gen<T>> std::fmt::Display for View<T, G> {
550    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551        let t: &T = self.inner.downcast_ref().unwrap();
552        t.fmt(f)
553    }
554}
555
556impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoIterator for &'a View<T, G>
557where
558    &'a T: IntoIterator,
559{
560    type Item = <<&'a T as IntoIterator>::IntoIter as Iterator>::Item;
561
562    type IntoIter = <&'a T as IntoIterator>::IntoIter;
563
564    fn into_iter(self) -> Self::IntoIter {
565        self.deref().into_iter()
566    }
567}
568
569/// A mutably borrowed resource that may be created by default.
570///
571/// Node functions wrap their parameters in [`View`], [`ViewMut`] or [`Move`].
572///
573/// `ViewMut` has two type parameters:
574/// * `T` - The type of the resource.
575/// * `G` - The method by which the resource can be generated if it doesn't
576///   already exist. By default this is [`SomeDefault`], which denotes creating
577///   the resource using its default implementation. Another option is
578///   [`NoDefault`] which fails to generate the resource.
579///
580/// ```rust
581/// use moongraph::*;
582///
583/// let mut graph = Graph::default();
584/// let default_number = graph.visit(|u: ViewMut<usize>| { *u }).map_err(|e| e.to_string());
585/// assert_eq!(Ok(0), default_number);
586///
587/// let no_number = graph.visit(|f: ViewMut<f32, NoDefault>| *f);
588/// assert!(no_number.is_err());
589/// ```
590pub struct ViewMut<T, G: Gen<T> = SomeDefault> {
591    inner: LoanMut,
592    _phantom: PhantomData<(T, G)>,
593}
594
595impl<T: Any + Send + Sync, G: Gen<T>> Deref for ViewMut<T, G> {
596    type Target = T;
597
598    fn deref(&self) -> &Self::Target {
599        // UNWRAP: safe because it was constructed with `T`
600        self.inner.downcast_ref().unwrap()
601    }
602}
603
604impl<T: Any + Send + Sync, G: Gen<T>> DerefMut for ViewMut<T, G> {
605    fn deref_mut(&mut self) -> &mut Self::Target {
606        // UNWRAP: safe because it was constructed with `T`
607        self.inner.downcast_mut().unwrap()
608    }
609}
610
611impl<T: Any + Send + Sync, G: Gen<T>> Edges for ViewMut<T, G> {
612    fn writes() -> Vec<TypeKey> {
613        vec![TypeKey::new::<T>()]
614    }
615
616    fn construct(resources: &mut TypeMap) -> Result<Self, GraphError> {
617        let key = TypeKey::new::<T>();
618        let inner = match resources.loan_mut(key).context(ResourceSnafu)? {
619            Some(inner) => inner,
620            None => {
621                let t = G::generate().context(MissingSnafu {
622                    name: std::any::type_name::<T>(),
623                })?;
624                // UNWRAP: safe because we know this type was missing, and no other type
625                // is stored with this type's type id.
626                let _ = resources.insert_value(t).unwrap();
627                log::trace!("generated missing {}", std::any::type_name::<T>());
628                // UNWRAP: safe because we just inserted
629                resources.loan_mut(key).unwrap().unwrap()
630            }
631        };
632        Ok(ViewMut {
633            inner,
634            _phantom: PhantomData,
635        })
636    }
637}
638
639impl<T: std::fmt::Display + Any + Send + Sync, G: Gen<T>> std::fmt::Display for ViewMut<T, G> {
640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641        let t: &T = self.inner.downcast_ref().unwrap();
642        t.fmt(f)
643    }
644}
645
646impl<'a, T: Send + Sync + 'static, G: Gen<T>> IntoIterator for &'a ViewMut<T, G>
647where
648    &'a T: IntoIterator,
649{
650    type Item = <<&'a T as IntoIterator>::IntoIter as Iterator>::Item;
651
652    type IntoIter = <&'a T as IntoIterator>::IntoIter;
653
654    fn into_iter(self) -> Self::IntoIter {
655        self.deref().into_iter()
656    }
657}
658
659/// Contains the nodes/functions and specifies their execution order.
660#[derive(Default)]
661pub struct Execution {
662    barrier: usize,
663    unscheduled: Vec<Node<Function, TypeKey>>,
664    schedule: Vec<Vec<Node<Function, TypeKey>>>,
665}
666
667impl Execution {
668    /// Returns the number of nodes.
669    pub fn len(&self) -> usize {
670        self.unscheduled.len() + self.schedule.iter().map(|batch| batch.len()).sum::<usize>()
671    }
672
673    /// Returns whether this execution is empty.
674    pub fn is_empty(&self) -> bool {
675        self.len() == 0
676    }
677}
678
679pub struct BatchResult<'graph> {
680    nodes: &'graph mut Vec<Node<Function, TypeKey>>,
681    resources: &'graph mut TypeMap,
682    results: Vec<Result<Resource, GraphError>>,
683}
684
685impl<'graph> BatchResult<'graph> {
686    /// Save the results of the batch run to the graph.
687    ///
688    /// Optionally trim any nodes that report a [`GraphError::TrimNode`] result.
689    ///
690    /// Optionally unifies resources.
691    ///
692    /// Returns `true` if any nodes were trimmed.
693    pub fn save(
694        self,
695        should_trim_nodes: bool,
696        should_unify_resources: bool,
697    ) -> Result<bool, GraphError> {
698        let BatchResult {
699            nodes,
700            resources,
701            results,
702        } = self;
703        let mut trimmed_any = false;
704        let mut trimmings = vec![false; nodes.len()];
705        for ((node, output), should_trim) in nodes.iter().zip(results).zip(trimmings.iter_mut()) {
706            match output {
707                Err(GraphError::TrimNode) => {
708                    // TrimNode is special in that it is not really an error.
709                    // Instead it means that the system which returned TrimNode should be
710                    // removed from the graph because its work is done.
711                    *should_trim = true;
712                    trimmed_any = should_trim_nodes;
713                }
714                Err(o) => {
715                    // A system hit an unrecoverable error.
716                    log::error!("node '{}' erred: {}", node.name(), o);
717                    return Err(o);
718                }
719                Ok(output) => (node.inner().save)(output, resources)?,
720            }
721        }
722
723        if trimmed_any {
724            let mut n = 0;
725            nodes.retain_mut(|_| {
726                let should_trim = trimmings[n];
727                n += 1;
728                !should_trim
729            });
730        }
731
732        if should_unify_resources {
733            resources.unify().context(ResourceSnafu)?;
734        }
735
736        Ok(trimmed_any)
737    }
738}
739
740pub struct Batch<'graph> {
741    nodes: &'graph mut Vec<Node<Function, TypeKey>>,
742    resources: &'graph mut TypeMap,
743    //inputs: Vec<Resource>,
744    //runs: Vec<&'graph Box<dyn Fn(Resource) -> Result<Resource, GraphError> + Send + Sync>>,
745    // local: Option<(
746    //     Resource,
747    //     Box<dyn FnOnce(Resource) -> Result<Resource, GraphError> + 'local>,
748    // )>,
749}
750
751impl<'a> Batch<'a> {
752    /// Create a new [`Batch`] with a local function.
753    pub fn new(resources: &'a mut TypeMap, nodes: &'a mut Vec<Node<Function, TypeKey>>) -> Self {
754        Batch { resources, nodes }
755    }
756
757    #[cfg(feature = "parallel")]
758    pub fn run(
759        self,
760        local: &mut Option<impl FnOnce(Resource) -> Result<Resource, GraphError>>,
761    ) -> Result<BatchResult<'a>, GraphError> {
762        use rayon::prelude::*;
763
764        let Batch { nodes, resources } = self;
765
766        let mut local_f = None;
767        let mut inputs = vec![];
768        let mut runs = vec![];
769        for node in nodes.iter_mut() {
770            log::trace!("preparing node '{}'", node.name());
771            let input = (node.inner().prepare)(resources)?;
772            if let Some(f) = node.inner_mut().run.as_mut() {
773                inputs.push(input);
774                runs.push(f);
775            } else {
776                let f = local.take().context(MissingLocalSnafu)?;
777                local_f = Some((
778                    input,
779                    Box::new(f) as Box<dyn FnOnce(Resource) -> Result<Resource, GraphError>>,
780                ));
781            }
782        }
783
784        let mut results = inputs
785            .into_par_iter()
786            .zip(runs.into_par_iter())
787            .map(|(input, f)| (f)(input))
788            .collect::<Vec<_>>();
789
790        if let Some((input, f)) = local_f {
791            results.push((f)(input));
792        }
793
794        Ok(BatchResult {
795            nodes,
796            results,
797            resources,
798        })
799    }
800
801    #[cfg(not(feature = "parallel"))]
802    pub fn run(
803        self,
804        local: &mut Option<impl FnOnce(Resource) -> Result<Resource, GraphError>>,
805    ) -> Result<BatchResult<'a>, GraphError> {
806        let Batch { nodes, resources } = self;
807
808        let mut local_f = None;
809        let mut inputs = vec![];
810        let mut runs = vec![];
811        for node in nodes.iter_mut() {
812            let input = (node.inner().prepare)(resources)?;
813            if let Some(f) = node.inner_mut().run.as_mut() {
814                inputs.push(input);
815                runs.push(f);
816            } else {
817                let f = local.take().context(MissingLocalSnafu)?;
818                local_f = Some((
819                    input,
820                    Box::new(f) as Box<dyn FnOnce(Resource) -> Result<Resource, GraphError>>,
821                ));
822            }
823        }
824
825        let mut outputs = inputs
826            .into_iter()
827            .zip(runs.into_iter())
828            .map(|(input, f)| (f)(input))
829            .collect::<Vec<_>>();
830
831        if let Some((input, f)) = local_f {
832            outputs.push((f)(input));
833        }
834
835        Ok(BatchResult {
836            nodes,
837            results: outputs,
838            resources,
839        })
840    }
841}
842
843/// Provides access to consecutive batches of scheduled nodes/functions.
844pub struct Batches<'graph> {
845    schedule: std::slice::IterMut<'graph, Vec<Node<Function, TypeKey>>>,
846    resources: &'graph mut TypeMap,
847}
848
849impl<'graph> Batches<'graph> {
850    /// Overwrite the batch's resources, allowing the schedule to operate on a separate
851    /// set of resources.
852    pub fn set_resources(&mut self, resources: &'graph mut TypeMap) {
853        self.resources = resources;
854    }
855
856    pub fn next_batch(&mut self) -> Option<Batch> {
857        let nodes: &'graph mut Vec<_> = self.schedule.next()?;
858        let batch = Batch::new(self.resources, nodes);
859        Some(batch)
860    }
861
862    /// Attempt to unify resources, returning `true` when unification was successful
863    /// or `false` when resources are still loaned.
864    pub fn unify(&mut self) -> bool {
865        self.resources.unify().is_ok()
866    }
867
868    /// Return the number of batches remaining in the schedule.
869    pub fn len(&self) -> usize {
870        self.schedule.len()
871    }
872
873    /// Returns `true` if there are no more batches.
874    pub fn is_empty(&self) -> bool {
875        self.len() == 0
876    }
877}
878
879/// An acyclic, directed graph made up of nodes/functions and edges/resources.
880///
881/// Notably nodes may have additional run requirements added to them besides
882/// input/output requirements. These include:
883///
884/// * barriers
885/// * run before node
886/// * run after node
887///
888/// See the module documentation for [`Node`] for more info on constructing
889/// nodes with granular constraints.
890#[derive(Default)]
891pub struct Graph {
892    resources: TypeMap,
893    execution: Execution,
894}
895
896impl Graph {
897    pub fn _resources_mut(&mut self) -> &mut TypeMap {
898        &mut self.resources
899    }
900
901    /// Creates a graph node from an [`Fn`] closure.
902    ///
903    /// A node in the graph is a boxed Rust closure that may do any or all the
904    /// following:
905    ///
906    /// * Create resources by returning a result that implements
907    ///   [`NodeResults`].
908    /// * Consume one or more resources by having a field in the input parameter
909    ///   wrapped in [`Move`]. The resource will not be available in the graph
910    ///   after the node is run.
911    /// * Read one or more resources by having a field in the input parameter
912    ///   wrapped in [`View`].
913    /// * Write one or more resources by having a field in the input parameter
914    ///   wrapped in [`ViewMut`].
915    ///
916    /// By default `IsGraphNode` is implemented for functions that take one
917    /// parameter implementing [`Edges`] and returning a `Result` where the "ok"
918    /// type implements `NodeResults`.
919    pub fn node<Input, Output, F: IsGraphNode<Input, Output>>(f: F) -> Node<Function, TypeKey> {
920        f.into_node()
921    }
922
923    /// Creates a graph node without a closure, to be supplied later with
924    /// [`Graph::run_with_local`].
925    ///
926    /// The returned node may be added to a graph and scheduled, allowing
927    /// closures with local scope requirements to fit into the graph.
928    ///
929    /// At this time only one local node is allowed.
930    pub fn local<Input, Output>() -> Node<Function, TypeKey>
931    where
932        Input: Edges + Any + Send + Sync,
933        Output: NodeResults + Any + Send + Sync,
934    {
935        Node::new(Function {
936            prepare: Box::new(prepare::<Input>),
937            run: None,
938            save: Box::new(save::<Output>),
939        })
940        .with_reads(Input::reads())
941        .with_writes(Input::writes())
942        .with_moves(Input::moves())
943        .with_results(Output::creates())
944    }
945
946    #[deprecated(
947        since = "0.3.3",
948        note = "Ambiguous name. Replaced by `interleave_subgraph` and `add_subgraph`. Use \
949                Graph::interleave_subgraph instead as a direct replacment."
950    )]
951    /// Merge two graphs, preferring the right in cases of key collisions.
952    ///
953    /// The values of `rhs` will override those of `lhs`.
954    pub fn merge(mut lhs: Graph, rhs: Graph) -> Graph {
955        lhs.interleave_subgraph(rhs);
956        lhs
957    }
958
959    /// Add a subgraph, preferring the right in cases of key collisions.
960    ///
961    /// The values of `rhs` will override those of `self`.
962    ///
963    /// Barriers in each graph will be considered equal. This has the effect
964    /// that after adding the subgraph, nodes in `rhs` may run at the same
965    /// time as nodes in `lhs` if their barrier matches.
966    ///
967    /// This is analogous to adding each graph's nodes in an interleaving order,
968    /// sorted by their barrier.
969    ///
970    /// ## Example:
971    /// ```rust
972    /// use moongraph::{graph, Graph, GraphError, ViewMut};
973    ///
974    /// fn one(_: ()) -> Result<(), GraphError> {
975    ///     log::trace!("one");
976    ///     Ok(())
977    /// }
978    /// fn two(mut an_f32: ViewMut<f32>) -> Result<(), GraphError> {
979    ///     log::trace!("two");
980    ///     *an_f32 += 1.0;
981    ///     Ok(())
982    /// }
983    /// fn three(_: ()) -> Result<(), GraphError> {
984    ///     log::trace!("three");
985    ///     Ok(())
986    /// }
987    /// fn four(_: ()) -> Result<(), GraphError> {
988    ///     log::trace!("four");
989    ///     Ok(())
990    /// }
991    ///
992    /// let mut one_two = graph!(one < two).with_barrier();
993    /// assert_eq!(1, one_two.get_barrier());
994    /// let three_four = graph!(three < four);
995    /// assert_eq!(0, three_four.get_barrier());
996    /// one_two.interleave_subgraph(three_four);
997    /// one_two.reschedule().unwrap();
998    /// assert_eq!(
999    ///     vec![vec!["one", "three"], vec!["four", "two"]],
1000    ///     one_two.get_schedule()
1001    /// );
1002    /// ```
1003    pub fn interleave_subgraph(&mut self, mut rhs: Graph) -> &mut Self {
1004        self.unschedule();
1005        rhs.unschedule();
1006        let Graph {
1007            resources: mut rhs_resources,
1008            execution:
1009                Execution {
1010                    unscheduled: rhs_nodes,
1011                    barrier: _,
1012                    schedule: _,
1013                },
1014        } = rhs;
1015        self.resources
1016            .extend(std::mem::take(rhs_resources.deref_mut()));
1017        let mut unscheduled: HashMap<String, Node<Function, TypeKey>> = HashMap::default();
1018        let lhs_nodes = std::mem::take(&mut self.execution.unscheduled);
1019        unscheduled.extend(
1020            lhs_nodes
1021                .into_iter()
1022                .map(|node| (node.name().to_string(), node)),
1023        );
1024        unscheduled.extend(
1025            rhs_nodes
1026                .into_iter()
1027                .map(|node| (node.name().to_string(), node)),
1028        );
1029        self.execution.unscheduled = unscheduled.into_iter().map(|v| v.1).collect();
1030        self.execution.barrier = self.execution.barrier.max(rhs.execution.barrier);
1031        self
1032    }
1033
1034    /// Add a subgraph, preferring the right in cases of key collisions.
1035    ///
1036    /// The values of `rhs` will override those of `self`.
1037    ///
1038    /// Barriers will be kept in place, though barriers in `rhs` will be
1039    /// incremented by `self.barrier`. This has the effect that after adding the
1040    /// subgraph, nodes in `rhs` will run after the last barrier in `self`,
1041    /// or later if `rhs` has barriers of its own.
1042    ///
1043    /// This is analogous to adding all of the nodes in `rhs`, one by one, to
1044    /// `self` - while keeping the constraints of `rhs` in place.
1045    ///
1046    /// ## Example:
1047    /// ```rust
1048    /// use moongraph::{graph, Graph, GraphError, ViewMut};
1049    ///
1050    /// fn one(_: ()) -> Result<(), GraphError> {
1051    ///     log::trace!("one");
1052    ///     Ok(())
1053    /// }
1054    /// fn two(mut an_f32: ViewMut<f32>) -> Result<(), GraphError> {
1055    ///     log::trace!("two");
1056    ///     *an_f32 += 1.0;
1057    ///     Ok(())
1058    /// }
1059    /// fn three(_: ()) -> Result<(), GraphError> {
1060    ///     log::trace!("three");
1061    ///     Ok(())
1062    /// }
1063    /// fn four(_: ()) -> Result<(), GraphError> {
1064    ///     log::trace!("four");
1065    ///     Ok(())
1066    /// }
1067    ///
1068    /// let mut one_two = graph!(one < two).with_barrier();
1069    /// assert_eq!(1, one_two.get_barrier());
1070    /// let three_four = graph!(three < four);
1071    /// assert_eq!(0, three_four.get_barrier());
1072    /// one_two.add_subgraph(three_four);
1073    /// one_two.reschedule().unwrap();
1074    /// assert_eq!(
1075    ///     vec![vec!["one"], vec!["two"], vec!["three"], vec!["four"]],
1076    ///     one_two.get_schedule()
1077    /// );
1078    /// ```
1079    pub fn add_subgraph(&mut self, mut rhs: Graph) -> &mut Self {
1080        self.unschedule();
1081        rhs.unschedule();
1082        let Graph {
1083            resources: mut rhs_resources,
1084            execution:
1085                Execution {
1086                    unscheduled: rhs_nodes,
1087                    barrier: rhs_barrier,
1088                    schedule: _,
1089                },
1090        } = rhs;
1091        let base_barrier = self.execution.barrier;
1092        self.execution.barrier = base_barrier + rhs_barrier;
1093        self.resources
1094            .extend(std::mem::take(rhs_resources.deref_mut()));
1095        self.execution
1096            .unscheduled
1097            .extend(rhs_nodes.into_iter().map(|node| {
1098                let barrier = node.get_barrier();
1099                node.with_barrier(base_barrier + barrier)
1100            }));
1101
1102        self
1103    }
1104
1105    /// Proxy for [`Graph::add_subgraph`] with `self` chaining.
1106    pub fn with_subgraph(mut self, rhs: Graph) -> Self {
1107        self.add_subgraph(rhs);
1108        self
1109    }
1110
1111    /// Unschedule all functions.
1112    fn unschedule(&mut self) {
1113        self.execution.unscheduled.extend(
1114            std::mem::take(&mut self.execution.schedule)
1115                .into_iter()
1116                .flatten(),
1117        );
1118    }
1119
1120    /// Reschedule all functions.
1121    ///
1122    /// If the functions were already scheduled this will unscheduled them
1123    /// first.
1124    pub fn reschedule(&mut self) -> Result<(), GraphError> {
1125        log::trace!("rescheduling the render graph:");
1126        self.unschedule();
1127        let all_nodes = std::mem::take(&mut self.execution.unscheduled);
1128        let dag = all_nodes
1129            .into_iter()
1130            .fold(Dag::default(), |dag, node| dag.with_node(node));
1131        let schedule =
1132            dag.build_schedule()
1133                .map_err(|dagga::BuildScheduleError { source, mut dag }| {
1134                    // we have to put the nodes back so the library user can do debugging
1135                    for node in dag.take_nodes() {
1136                        self.add_node(node);
1137                    }
1138                    GraphError::Scheduling { source }
1139                })?;
1140        self.execution.schedule = schedule.batches;
1141        // Order the nodes in each batch by node name so they are deterministic.
1142        for batch in self.execution.schedule.iter_mut() {
1143            batch.sort_by(|a, b| a.name().cmp(b.name()));
1144        }
1145
1146        let batched_names = self.get_schedule_and_resources();
1147        log::trace!("{:#?}", batched_names);
1148        Ok(())
1149    }
1150
1151    /// Return the names of scheduled nodes.
1152    ///
1153    /// If no nodes have been scheduled this will return an empty vector.
1154    ///
1155    /// Use [`Graph::reschedule`] to manually schedule the nodes before calling
1156    /// this.
1157    pub fn get_schedule(&self) -> Vec<Vec<&str>> {
1158        self.execution
1159            .schedule
1160            .iter()
1161            .map(|batch| batch.iter().map(|node| node.name()).collect())
1162            .collect()
1163    }
1164
1165    /// Return the names of scheduled nodes along with the names of their resources.
1166    ///
1167    /// If no nodes have been scheduled this will return an empty vector.
1168    ///
1169    /// Use [`Graph::reschedule`] to manually schedule the nodes before calling
1170    /// this.
1171    pub fn get_schedule_and_resources(&self) -> Vec<Vec<(&str, Vec<&str>)>> {
1172        self.execution
1173            .schedule
1174            .iter()
1175            .map(|batch| {
1176                batch
1177                    .iter()
1178                    .map(|node| {
1179                        let name = node.name();
1180                        let inputs = node
1181                            .all_inputs()
1182                            .into_iter()
1183                            .map(|key| key.name())
1184                            .collect();
1185                        (name, inputs)
1186                    })
1187                    .collect()
1188            })
1189            .collect()
1190    }
1191
1192    /// An iterator over all nodes.
1193    pub fn nodes(&self) -> impl Iterator<Item = &Node<Function, TypeKey>> {
1194        self.execution
1195            .schedule
1196            .iter()
1197            .flatten()
1198            .chain(self.execution.unscheduled.iter())
1199    }
1200
1201    /// A mutable iterator over all nodes.
1202    pub fn nodes_mut(&mut self) -> impl Iterator<Item = &mut Node<Function, TypeKey>> {
1203        self.execution
1204            .schedule
1205            .iter_mut()
1206            .flatten()
1207            .chain(self.execution.unscheduled.iter_mut())
1208    }
1209
1210    /// Add multiple nodes to this graph.
1211    pub fn with_nodes(self, nodes: impl IntoIterator<Item = Node<Function, TypeKey>>) -> Self {
1212        nodes.into_iter().fold(self, Self::with_node)
1213    }
1214
1215    /// Add a node to the graph.
1216    pub fn add_node(&mut self, node: Node<Function, TypeKey>) {
1217        self.execution
1218            .unscheduled
1219            .push(node.runs_after_barrier(self.execution.barrier));
1220    }
1221
1222    /// Return a reference to the node with the given name, if possible.
1223    pub fn get_node(&self, name: impl AsRef<str>) -> Option<&Node<Function, TypeKey>> {
1224        self.nodes().find(|&node| node.name() == name.as_ref())
1225    }
1226
1227    /// Return a mutable reference to the node with the given name, if possible.
1228    pub fn get_node_mut(&mut self, name: impl AsRef<str>) -> Option<&mut Node<Function, TypeKey>> {
1229        self.nodes_mut().find(|node| node.name() == name.as_ref())
1230    }
1231
1232    /// Remove a node from the graph by name.
1233    ///
1234    /// This leaves the graph in an unscheduled state.
1235    pub fn remove_node(&mut self, name: impl AsRef<str>) -> Option<Node<Function, TypeKey>> {
1236        self.unschedule();
1237        let mut may_index = None;
1238        for (i, node) in self.execution.unscheduled.iter().enumerate() {
1239            if node.name() == name.as_ref() {
1240                may_index = Some(i);
1241            }
1242        }
1243        if let Some(i) = may_index.take() {
1244            Some(self.execution.unscheduled.swap_remove(i))
1245        } else {
1246            None
1247        }
1248    }
1249
1250    /// Add a node to the graph.
1251    pub fn with_node(mut self, node: Node<Function, TypeKey>) -> Self {
1252        self.add_node(node);
1253        self
1254    }
1255
1256    /// Add a named function to the graph.
1257    pub fn with_function<Input, Output>(
1258        mut self,
1259        name: impl Into<String>,
1260        f: impl IsGraphNode<Input, Output>,
1261    ) -> Self {
1262        self.add_function(name, f);
1263        self
1264    }
1265
1266    /// Add a named function to the graph.
1267    pub fn add_function<Input, Output>(
1268        &mut self,
1269        name: impl Into<String>,
1270        f: impl IsGraphNode<Input, Output>,
1271    ) {
1272        self.add_node(f.into_node().with_name(name));
1273    }
1274
1275    /// Return whether the graph contains a node/function with the given name.
1276    pub fn contains_node(&self, name: impl AsRef<str>) -> bool {
1277        let name = name.as_ref();
1278        let search = |node: &Node<Function, TypeKey>| node.name() == name;
1279        if self.execution.unscheduled.iter().any(search) {
1280            return true;
1281        }
1282        self.execution.schedule.iter().flatten().any(search)
1283    }
1284
1285    /// Return whether the graph contains a resource with the parameterized
1286    /// type.
1287    pub fn contains_resource<T: Any + Send + Sync>(&self) -> bool {
1288        let key = TypeKey::new::<T>();
1289        self.resources.contains_key(&key)
1290    }
1291
1292    /// Explicitly insert a resource (an edge) into the graph.
1293    ///
1294    /// This will overwrite an existing resource of the same type in the graph.
1295    pub fn with_resource<T: Any + Send + Sync>(mut self, t: T) -> Self {
1296        self.add_resource(t);
1297        self
1298    }
1299
1300    /// Explicitly insert a resource (an edge) into the graph.
1301    ///
1302    /// This will overwrite an existing resource of the same type in the graph.
1303    pub fn add_resource<T: Any + Send + Sync>(&mut self, t: T) {
1304        // UNWRAP: safe because of the guarantees around `insert_value`
1305        self.resources.insert_value(t).unwrap();
1306    }
1307
1308    /// Add a barrier to the graph.
1309    ///
1310    /// All nodes added after the barrier will run after nodes added before the
1311    /// barrier.
1312    pub fn add_barrier(&mut self) {
1313        self.execution.barrier += 1;
1314    }
1315
1316    /// Add a barrier to the graph.
1317    ///
1318    /// All nodes added after the barrier will run after nodes added before the
1319    /// barrier.
1320    pub fn with_barrier(mut self) -> Self {
1321        self.add_barrier();
1322        self
1323    }
1324
1325    /// Return the current barrier.
1326    ///
1327    /// This will be the barrier for any added nodes.
1328    pub fn get_barrier(&self) -> usize {
1329        self.execution.barrier
1330    }
1331
1332    /// Add a locally run function to the graph by adding its name, input and
1333    /// output params.
1334    ///
1335    /// There may be only one locally run function.
1336    ///
1337    /// If a graph contains a local function the graph _MUST_ be run with
1338    /// [`Graph::run_with_local`].
1339    pub fn add_local<Input, Output>(&mut self, name: impl Into<String>)
1340    where
1341        Input: Edges + Any + Send + Sync,
1342        Output: NodeResults + Any + Send + Sync,
1343    {
1344        self.add_node(Self::local::<Input, Output>().with_name(name));
1345    }
1346
1347    pub fn with_local<Input, Output>(mut self, name: impl Into<String>) -> Self
1348    where
1349        Input: Edges + Any + Send + Sync,
1350        Output: NodeResults + Any + Send + Sync,
1351    {
1352        self.add_local::<Input, Output>(name);
1353        self
1354    }
1355
1356    /// Reschedule the graph **only if there are unscheduled nodes**.
1357    ///
1358    /// Returns an error if a schedule cannot be built.
1359    pub fn reschedule_if_necessary(&mut self) -> Result<(), GraphError> {
1360        if !self.execution.unscheduled.is_empty() {
1361            self.reschedule()?;
1362        }
1363        Ok(())
1364    }
1365
1366    /// Return an iterator over prepared schedule-batches.
1367    ///
1368    /// The graph should be scheduled ahead of calling this function.
1369    pub fn batches(&mut self) -> Batches {
1370        Batches {
1371            schedule: self.execution.schedule.iter_mut(),
1372            resources: &mut self.resources,
1373        }
1374    }
1375
1376    /// Run the graph.
1377    pub fn run(&mut self) -> Result<(), GraphError> {
1378        self.run_with_local(missing_local)
1379    }
1380
1381    /// Run the graph with the given local function.
1382    pub fn run_with_local<Input, Output, E>(
1383        &mut self,
1384        f: impl FnOnce(Input) -> Result<Output, E>,
1385    ) -> Result<(), GraphError>
1386    where
1387        Input: Edges + Any + Send + Sync,
1388        Output: NodeResults + Any + Send + Sync,
1389        E: ToString,
1390    {
1391        let mut local = Some(Box::new(move |resources: Resource| {
1392            let input = *resources.downcast::<Input>().unwrap();
1393            match (f)(input) {
1394                Ok(creates) => Ok(Box::new(creates) as Resource),
1395                Err(e) => Err(GraphError::RunningLocalNode {
1396                    error: e.to_string(),
1397                }),
1398            }
1399        })
1400            as Box<dyn FnOnce(Resource) -> Result<Resource, GraphError>>);
1401
1402        self.reschedule_if_necessary()?;
1403
1404        let mut got_trimmed = false;
1405        let mut batches = self.batches();
1406        while let Some(batch) = batches.next_batch() {
1407            let batch_result = batch.run(&mut local)?;
1408            let did_trim_batch = batch_result.save(true, true)?;
1409            got_trimmed = got_trimmed || did_trim_batch;
1410        }
1411        if got_trimmed {
1412            self.reschedule()?;
1413        }
1414
1415        Ok(())
1416    }
1417
1418    /// Remove a resource from the graph.
1419    ///
1420    /// Returns an error if the requested resource is loaned, and cannot be removed.
1421    pub fn remove_resource<T: Any + Send + Sync>(&mut self) -> Result<Option<T>, GraphError> {
1422        let key = TypeKey::new::<T>();
1423        if let Some(inner_loan) = self.resources.remove(&key) {
1424            match inner_loan.into_owned(key.name()) {
1425                Ok(value) => {
1426                    // UNWRAP: safe because we got this out as `T`, and it can only be stored
1427                    // as `T`
1428                    let box_t = value.downcast::<T>().unwrap();
1429                    Ok(Some(*box_t))
1430                }
1431                Err(loan) => {
1432                    self.resources.insert(key, loan);
1433                    let err = ResourceLoanedSnafu {
1434                        type_name: std::any::type_name::<T>(),
1435                    }
1436                    .build();
1437                    log::error!("{err}");
1438                    Err(err)
1439                }
1440            }
1441        } else {
1442            // There is no such resource
1443            Ok(None)
1444        }
1445    }
1446
1447    /// Get a reference to a resource in the graph.
1448    ///
1449    /// If the resource _does not_ exist `Ok(None)` will be returned.
1450    pub fn get_resource<T: Any + Send + Sync>(&self) -> Result<Option<&T>, GraphError> {
1451        self.resources.get_value().context(ResourceSnafu)
1452    }
1453
1454    /// Get a mutable reference to a resource in the graph.
1455    ///
1456    /// If the resource _does not_ exist `Ok(None)` will be returned.
1457    pub fn get_resource_mut<T: Any + Send + Sync>(&mut self) -> Result<Option<&mut T>, GraphError> {
1458        self.resources.get_value_mut().context(ResourceSnafu)
1459    }
1460
1461    /// Fetch graph edges and visit them with a closure.
1462    ///
1463    /// This is like running a one-off graph node, but `S` does not get packed
1464    /// into the graph as a result resource, instead it is given back to the
1465    /// callsite.
1466    ///
1467    /// ## Note
1468    /// By design, visiting the graph with a type that uses `Move` in one of its
1469    /// fields will result in the wrapped type of that field being `move`d
1470    /// **out** of the graph. The resource will no longer be available
1471    /// within the graph.
1472    ///
1473    /// ```rust
1474    /// use moongraph::*;
1475    /// use snafu::prelude::*;
1476    ///
1477    /// #[derive(Debug, Snafu)]
1478    /// enum TestError {}
1479    ///
1480    /// #[derive(Edges)]
1481    /// struct Input {
1482    ///     num_usize: View<usize>,
1483    ///     num_f32: ViewMut<f32>,
1484    ///     num_f64: Move<f64>,
1485    /// }
1486    ///
1487    /// // pack the graph with resources
1488    /// let mut graph = Graph::default()
1489    ///     .with_resource(0usize)
1490    ///     .with_resource(0.0f32)
1491    ///     .with_resource(0.0f64);
1492    ///
1493    /// // visit the graph, reading, modifying and _moving_!
1494    /// let num_usize = graph.visit(|mut input: Input| {
1495    ///     *input.num_f32 = 666.0;
1496    ///     *input.num_f64 += 10.0;
1497    ///     *input.num_usize
1498    /// }).unwrap();
1499    ///
1500    /// // observe we read usize
1501    /// assert_eq!(0, num_usize);
1502    /// assert_eq!(0, *graph.get_resource::<usize>().unwrap().unwrap());
1503    ///
1504    /// // observe we modified f32
1505    /// assert_eq!(666.0, *graph.get_resource::<f32>().unwrap().unwrap());
1506    ///
1507    /// // observe we moved f64 out of the graph and it is no longer present
1508    /// assert!(!graph.contains_resource::<f64>());
1509    pub fn visit<T: Edges, S>(&mut self, f: impl FnOnce(T) -> S) -> Result<S, GraphError> {
1510        let t = T::construct(&mut self.resources)?;
1511        let s = f(t);
1512        self.resources.unify().context(ResourceSnafu)?;
1513        Ok(s)
1514    }
1515
1516    /// Split the graph into an execution (schedule of functions/nodes) and resources.
1517    pub fn into_parts(self) -> (Execution, TypeMap) {
1518        (self.execution, self.resources)
1519    }
1520
1521    #[cfg(feature = "dot")]
1522    /// Save the graph to the filesystem as a dot file to be visualized with
1523    /// graphiz (or similar).
1524    pub fn save_graph_dot(&self, path: &str) {
1525        use dagga::dot::DagLegend;
1526
1527        let legend =
1528            DagLegend::new(self.nodes()).with_resources_named(|ty: &TypeKey| ty.name().to_string());
1529        legend.save_to(path).unwrap();
1530    }
1531
1532    /// Internal function used in the [`graph!`] macro.
1533    pub fn _add_node_constraint(
1534        constraint: &str,
1535        i: &mut Node<Function, TypeKey>,
1536        j: Option<String>,
1537    ) {
1538        //
1539        match constraint {
1540            ">" => {
1541                i.add_runs_after(j.unwrap());
1542            }
1543            "<" => {
1544                i.add_runs_before(j.unwrap());
1545            }
1546            _ => {}
1547        }
1548    }
1549
1550    pub fn _last_node(&self) -> Option<String> {
1551        self.execution
1552            .unscheduled
1553            .last()
1554            .map(|node| node.name().to_string())
1555    }
1556
1557    pub fn node_len(&self) -> usize {
1558        self.execution.len()
1559    }
1560
1561    /// Pop off the next batch of nodes from the schedule.
1562    ///
1563    /// The graph must be scheduled first.
1564    pub fn take_next_batch_of_nodes(&mut self) -> Option<Vec<Node<Function, TypeKey>>> {
1565        if self.execution.schedule.is_empty() {
1566            None
1567        } else {
1568            self.execution.schedule.drain(0..1).next()
1569        }
1570    }
1571}
1572
1573/// Constructs a [`Graph`] using an intuitive shorthand for node ordering
1574/// relationships.
1575///
1576/// ## Example:
1577/// ```rust
1578/// # use moongraph::{Graph, graph, GraphError, ViewMut};
1579///
1580/// fn one(_: ()) -> Result<(), GraphError> {
1581///     log::trace!("one");
1582///     Ok(())
1583/// }
1584/// fn two(mut an_f32: ViewMut<f32>) -> Result<(), GraphError> {
1585///     log::trace!("two");
1586///     *an_f32 += 1.0;
1587///     Ok(())
1588/// }
1589/// fn three(_: ()) -> Result<(), GraphError> {
1590///     log::trace!("three");
1591///     Ok(())
1592/// }
1593///
1594/// let _a = graph!(one < two, three, three > two);
1595/// let _b = graph!(one, two);
1596/// let _c = graph!(one < two);
1597/// let _d = graph!(one);
1598/// let _e = graph!(one < two < three);
1599///
1600/// let mut g = graph!(one < two < three).with_resource(0.0f32);
1601/// g.reschedule().unwrap();
1602/// let schedule = g.get_schedule();
1603/// assert_eq!(vec![vec!["one"], vec!["two"], vec!["three"]], schedule);
1604/// ```
1605#[macro_export]
1606macro_rules! graph {
1607    ($i:ident $op:tt $($tail:tt)*) => {{
1608        let mut g = graph!($($tail)*);
1609        let tail = g._last_node();
1610        if let Some(node) = g.get_node_mut(stringify!($i)) {
1611            Graph::_add_node_constraint(stringify!($op), node, tail);
1612        } else {
1613            g.add_node({
1614                let mut node = Graph::node($i).with_name(stringify!($i));
1615                Graph::_add_node_constraint(stringify!($op), &mut node, tail);
1616                node
1617            });
1618        }
1619        g
1620    }};
1621
1622    ($i:ident$(,)?) => {
1623        Graph::default().with_node(Graph::node($i).with_name(stringify!($i)))
1624    }
1625}
1626
1627#[cfg(test)]
1628mod test {
1629    use super::*;
1630
1631    fn create(_: ()) -> Result<(usize,), GraphError> {
1632        Ok((0,))
1633    }
1634
1635    fn edit((mut num,): (ViewMut<usize>,)) -> Result<(), GraphError> {
1636        *num += 1;
1637        Ok(())
1638    }
1639
1640    fn finish((num,): (Move<usize>,)) -> Result<(), GraphError> {
1641        assert_eq!(1, num.into(), "edit did not run");
1642        Ok(())
1643    }
1644
1645    #[test]
1646    fn function_to_node() {
1647        // sanity test
1648        let mut graph = Graph::default()
1649            .with_function("create", create)
1650            .with_function("edit", edit);
1651        graph.run().unwrap();
1652        assert_eq!(1, *graph.get_resource::<usize>().unwrap().unwrap());
1653
1654        let mut graph = graph.with_function("finish", finish);
1655        graph.run().unwrap();
1656        assert!(graph.get_resource::<usize>().unwrap().is_none());
1657    }
1658
1659    #[test]
1660    fn many_inputs_many_outputs() {
1661        use crate as moongraph;
1662        // tests our Edges and NodeResults impl macros
1663        fn start(_: ()) -> Result<(usize, u32, f32, f64, &'static str, String), GraphError> {
1664            Ok((0, 0, 0.0, 0.0, "hello", "HELLO".into()))
1665        }
1666
1667        fn modify_ints(
1668            (mut numusize, mut numu32): (ViewMut<usize>, ViewMut<u32>),
1669        ) -> Result<(), GraphError> {
1670            *numusize += 1;
1671            *numu32 += 1;
1672            Ok(())
1673        }
1674
1675        fn modify_floats(
1676            (mut numf32, mut numf64): (ViewMut<f32>, ViewMut<f64>),
1677        ) -> Result<(), GraphError> {
1678            *numf32 += 10.0;
1679            *numf64 += 10.0;
1680            Ok(())
1681        }
1682
1683        fn modify_strings(
1684            (mut strstatic, mut strowned): (ViewMut<&'static str>, ViewMut<String>),
1685        ) -> Result<(), GraphError> {
1686            *strstatic = "goodbye";
1687            *strowned = "GOODBYE".into();
1688            Ok(())
1689        }
1690
1691        #[derive(Edges)]
1692        struct EndData(
1693            Move<usize>,
1694            Move<u32>,
1695            Move<f32>,
1696            Move<f64>,
1697            Move<&'static str>,
1698            Move<String>,
1699        );
1700
1701        fn end(
1702            EndData(nusize, nu32, nf32, nf64, sstatic, sowned): EndData,
1703        ) -> Result<(bool,), GraphError> {
1704            assert_eq!(1, *nusize);
1705            assert_eq!(1, *nu32);
1706            assert_eq!(10.0, *nf32);
1707            assert_eq!(10.0, *nf64);
1708            assert_eq!("goodbye", *sstatic);
1709            assert_eq!("GOODBYE", *sowned);
1710            Ok((true,))
1711        }
1712
1713        let mut graph = Graph::default()
1714            .with_function("start", start)
1715            .with_function("modify_ints", modify_ints)
1716            .with_function("modify_floats", modify_floats)
1717            .with_function("modify_strings", modify_strings)
1718            .with_function("end", end);
1719
1720        graph.reschedule().unwrap();
1721        let schedule = graph.get_schedule();
1722        assert_eq!(
1723            vec![
1724                vec!["start"],
1725                vec!["modify_floats", "modify_ints", "modify_strings"],
1726                vec!["end"]
1727            ],
1728            schedule,
1729            "schedule is wrong"
1730        );
1731
1732        graph.run().unwrap();
1733        let run_was_all_good = graph.get_resource::<bool>().unwrap().unwrap();
1734        assert!(run_was_all_good, "run was not all good");
1735    }
1736
1737    #[test]
1738    fn can_derive() {
1739        #[derive(Edges)]
1740        #[moongraph(crate = crate)]
1741        struct Input {
1742            num_usize: View<usize>,
1743            num_f32: ViewMut<f32>,
1744            num_f64: Move<f64>,
1745        }
1746
1747        type Output = (String, &'static str);
1748
1749        fn start(_: ()) -> Result<(usize, f32, f64), GraphError> {
1750            Ok((1, 0.0, 10.0))
1751        }
1752
1753        fn end(mut input: Input) -> Result<Output, GraphError> {
1754            *input.num_f32 += *input.num_f64 as f32;
1755            Ok((
1756                format!("{},{},{}", *input.num_usize, *input.num_f32, *input.num_f64),
1757                "done",
1758            ))
1759        }
1760
1761        let mut graph = Graph::default()
1762            .with_function("start", start)
1763            .with_function("end", end);
1764        graph.run().unwrap();
1765        assert_eq!(
1766            "1,10,10",
1767            graph.get_resource::<String>().unwrap().unwrap().as_str()
1768        );
1769    }
1770
1771    #[test]
1772    fn can_visit_and_then_borrow() {
1773        use crate as moongraph;
1774
1775        #[derive(Edges)]
1776        struct Input {
1777            num_usize: View<usize>,
1778            num_f32: ViewMut<f32>,
1779            num_f64: Move<f64>,
1780        }
1781
1782        let mut graph = Graph::default()
1783            .with_resource(0usize)
1784            .with_resource(0.0f32)
1785            .with_resource(0.0f64);
1786        let num_usize = graph
1787            .visit(|mut input: Input| {
1788                *input.num_f32 = 666.0;
1789                *input.num_f64 += 10.0;
1790                *input.num_usize
1791            })
1792            .unwrap();
1793        assert_eq!(0, num_usize);
1794        assert_eq!(0, *graph.get_resource::<usize>().unwrap().unwrap());
1795        assert_eq!(666.0, *graph.get_resource::<f32>().unwrap().unwrap());
1796        assert!(!graph.contains_resource::<f64>());
1797    }
1798
1799    #[cfg(feature = "none")]
1800    #[test]
1801    fn can_run_local() {
1802        fn start(_: ()) -> Result<(usize, u32, f32, f64, &'static str, String), GraphError> {
1803            Ok((0, 0, 0.0, 0.0, "hello", "HELLO".into()))
1804        }
1805
1806        fn modify_ints(
1807            (mut numusize, mut numu32): (ViewMut<usize>, ViewMut<u32>),
1808        ) -> Result<(), GraphError> {
1809            *numusize += 1;
1810            *numu32 += 1;
1811            Ok(())
1812        }
1813
1814        fn modify_floats(
1815            (mut numf32, mut numf64): (ViewMut<f32>, ViewMut<f64>),
1816        ) -> Result<(), GraphError> {
1817            *numf32 += 10.0;
1818            *numf64 += 10.0;
1819            Ok(())
1820        }
1821
1822        fn modify_strings(
1823            (mut strstatic, mut strowned): (ViewMut<&'static str>, ViewMut<String>),
1824        ) -> Result<(), GraphError> {
1825            *strstatic = "goodbye";
1826            *strowned = "GOODBYE".into();
1827            Ok(())
1828        }
1829
1830        fn end(
1831            (nusize, nu32, nf32, nf64, sstatic, sowned): (
1832                Move<usize>,
1833                Move<u32>,
1834                Move<f32>,
1835                Move<f64>,
1836                Move<&'static str>,
1837                Move<String>,
1838            ),
1839        ) -> Result<(bool,), GraphError> {
1840            assert_eq!(1, *nusize);
1841            assert_eq!(10, *nu32);
1842            assert_eq!(100.0, *nf32);
1843            assert_eq!(10.0, *nf64);
1844            assert_eq!("goodbye", *sstatic);
1845            assert_eq!("GOODBYE", *sowned);
1846            Ok((true,))
1847        }
1848
1849        let mut graph = graph!(start, modify_ints, modify_floats, modify_strings, end,)
1850            .with_local::<(ViewMut<u32>, ViewMut<f32>), ()>("local");
1851
1852        graph.reschedule().unwrap();
1853        assert_eq!(
1854            vec![
1855                vec!["start"],
1856                vec!["modify_strings", "modify_floats", "modify_ints"],
1857                vec!["local"],
1858                vec!["end"]
1859            ],
1860            graph.get_schedule(),
1861            "schedule is wrong"
1862        );
1863
1864        let mut my_num = 0.0;
1865        graph
1866            .run_with_local(
1867                |(mut nu32, mut nf32): (ViewMut<u32>, ViewMut<f32>)| -> Result<(), String> {
1868                    *nu32 *= 10;
1869                    *nf32 *= 10.0;
1870                    my_num = *nu32 as f32 + *nf32;
1871                    Ok(())
1872                },
1873            )
1874            .unwrap();
1875        let run_was_all_good = graph.get_resource::<bool>().unwrap().unwrap();
1876        assert!(run_was_all_good, "run was not all good");
1877        assert_eq!(110.0, my_num, "local did not run");
1878    }
1879
1880    #[test]
1881    // Tests that Gen will generate a default for a missing resource,
1882    // and that the result will be stored in the graph.
1883    fn can_generate_view_default() {
1884        let mut graph = Graph::default();
1885        let u = graph.visit(|u: View<usize>| *u).unwrap();
1886        assert_eq!(0, u);
1887
1888        let my_u = graph.get_resource::<usize>().unwrap();
1889        assert_eq!(Some(0), my_u.copied());
1890    }
1891}