lemon_graph/nodes/
mod.rs

1use petgraph::{graph::NodeIndex, visit::EdgeRef, Direction};
2use std::future::Future;
3use thiserror::Error;
4
5mod log;
6
7pub use log::LogNode;
8
9use crate::{Graph, GraphEdge, GraphNode, Value};
10
11#[derive(Debug, Error)]
12pub enum NodeError {
13    #[error("Missing input at index {0}")]
14    MissingInput(usize),
15    #[error("Conversion error, got {0:?}")]
16    ConversionError(Value),
17    #[error("Internal error: {0}")]
18    InternalError(String),
19}
20
21pub trait AsyncNode {
22    fn run(
23        &self,
24        inputs: Vec<Value>,
25    ) -> Box<dyn Future<Output = Result<Vec<Value>, NodeError>> + Unpin>;
26}
27
28pub trait SyncNode {
29    fn run(&self, inputs: Vec<Value>) -> Result<Vec<Value>, NodeError>;
30}
31
32pub trait NodeWrapper: Copy + Into<NodeIndex> {
33    fn input_stores(self, graph: &Graph) -> impl Iterator<Item = StoreWrapper> + '_ {
34        graph
35            .edges_directed(self.into(), Direction::Incoming)
36            .filter(|edge| matches!(edge.weight(), GraphEdge::DataMap(_)))
37            .map(|edge| StoreWrapper(edge.source()))
38    }
39    fn output_stores(self, graph: &Graph) -> impl Iterator<Item = StoreWrapper> + '_ {
40        graph
41            .edges_directed(self.into(), Direction::Outgoing)
42            .filter(|edge| matches!(edge.weight(), GraphEdge::DataMap(_)))
43            .map(|edge| StoreWrapper(edge.target()))
44    }
45
46    fn input_execution(self, graph: &Graph) -> impl Iterator<Item = NodeIndex> + '_ {
47        graph
48            .edges_directed(self.into(), Direction::Incoming)
49            .filter(|edge| matches!(edge.weight(), GraphEdge::ExecutionFlow))
50            .map(|edge| edge.source())
51    }
52    fn output_execution(self, graph: &Graph) -> impl Iterator<Item = NodeIndex> + '_ {
53        graph
54            .edges_directed(self.into(), Direction::Outgoing)
55            .filter(|edge| matches!(edge.weight(), GraphEdge::ExecutionFlow))
56            .map(|edge| edge.target())
57    }
58
59    /// Adds an execution flow from the given node to this node.
60    fn run_after(self, graph: &mut Graph, node: NodeIndex) {
61        graph.add_edge(node, self.into(), GraphEdge::ExecutionFlow);
62    }
63
64    /// Adds an execution flow from this node to the given node.
65    fn run_before(self, graph: &mut Graph, node: NodeIndex) {
66        graph.add_edge(self.into(), node, GraphEdge::ExecutionFlow);
67    }
68}
69
70#[derive(Debug, Error)]
71pub enum GetStoreError {
72    #[error("No store found")]
73    NoStore,
74}
75
76#[derive(Debug, Clone, Copy)]
77pub struct StoreWrapper(pub NodeIndex);
78
79impl StoreWrapper {
80    /// Returns an iterator over any input stores.
81    pub fn inputs(self, graph: &Graph) -> impl Iterator<Item = StoreWrapper> + '_ {
82        graph
83            .edges_directed(self.0, Direction::Incoming)
84            .filter(|edge| matches!(edge.weight(), GraphEdge::DataFlow))
85            .map(|edge| StoreWrapper(edge.source()))
86    }
87    /// Returns an iterator over any output stores.
88    pub fn outputs(self, graph: &Graph) -> impl Iterator<Item = StoreWrapper> + '_ {
89        graph
90            .edges_directed(self.0, Direction::Outgoing)
91            .filter(|edge| matches!(edge.weight(), GraphEdge::DataFlow))
92            .map(|edge| StoreWrapper(edge.target()))
93    }
94
95    /// Sets the input of the store.
96    /// This will remove any existing inputs.
97    pub fn set_input(&self, graph: &mut Graph, store: Option<StoreWrapper>) {
98        // Remove any existing inputs.
99        let edges = graph
100            .edges_directed(self.0, Direction::Incoming)
101            .filter(|edge| matches!(edge.weight(), GraphEdge::DataFlow))
102            .map(|edge| edge.id())
103            .collect::<Vec<_>>();
104
105        for edge in edges {
106            graph.remove_edge(edge);
107        }
108
109        // Set the new input.
110        if let Some(store) = store {
111            graph.add_edge(store.0, self.0, GraphEdge::DataFlow);
112        }
113    }
114
115    /// Adds an output edge to the given store.
116    pub fn add_output(&self, graph: &mut Graph, store: StoreWrapper) {
117        graph.add_edge(self.0, store.0, GraphEdge::DataFlow);
118    }
119
120    /// Sets the default value of the store.
121    /// This will be used if no input is set.
122    pub fn set_value(&self, graph: &mut Graph, value: Value) {
123        graph[self.0] = GraphNode::Store(value);
124    }
125}