1use petgraph::{graph::NodeIndex, visit::EdgeRef, Direction};
2use std::future::Future;
3use thiserror::Error;
4
5mod log;
6
7pub use log::LogNode;
8
9use crate::{Graph, GraphEdge, GraphNode, Value};
10
11#[derive(Debug, Error)]
12pub enum NodeError {
13 #[error("Missing input at index {0}")]
14 MissingInput(usize),
15 #[error("Conversion error, got {0:?}")]
16 ConversionError(Value),
17 #[error("Internal error: {0}")]
18 InternalError(String),
19}
20
21pub trait AsyncNode {
22 fn run(
23 &self,
24 inputs: Vec<Value>,
25 ) -> Box<dyn Future<Output = Result<Vec<Value>, NodeError>> + Unpin>;
26}
27
28pub trait SyncNode {
29 fn run(&self, inputs: Vec<Value>) -> Result<Vec<Value>, NodeError>;
30}
31
32pub trait NodeWrapper: Copy + Into<NodeIndex> {
33 fn input_stores(self, graph: &Graph) -> impl Iterator<Item = StoreWrapper> + '_ {
34 graph
35 .edges_directed(self.into(), Direction::Incoming)
36 .filter(|edge| matches!(edge.weight(), GraphEdge::DataMap(_)))
37 .map(|edge| StoreWrapper(edge.source()))
38 }
39 fn output_stores(self, graph: &Graph) -> impl Iterator<Item = StoreWrapper> + '_ {
40 graph
41 .edges_directed(self.into(), Direction::Outgoing)
42 .filter(|edge| matches!(edge.weight(), GraphEdge::DataMap(_)))
43 .map(|edge| StoreWrapper(edge.target()))
44 }
45
46 fn input_execution(self, graph: &Graph) -> impl Iterator<Item = NodeIndex> + '_ {
47 graph
48 .edges_directed(self.into(), Direction::Incoming)
49 .filter(|edge| matches!(edge.weight(), GraphEdge::ExecutionFlow))
50 .map(|edge| edge.source())
51 }
52 fn output_execution(self, graph: &Graph) -> impl Iterator<Item = NodeIndex> + '_ {
53 graph
54 .edges_directed(self.into(), Direction::Outgoing)
55 .filter(|edge| matches!(edge.weight(), GraphEdge::ExecutionFlow))
56 .map(|edge| edge.target())
57 }
58
59 fn run_after(self, graph: &mut Graph, node: NodeIndex) {
61 graph.add_edge(node, self.into(), GraphEdge::ExecutionFlow);
62 }
63
64 fn run_before(self, graph: &mut Graph, node: NodeIndex) {
66 graph.add_edge(self.into(), node, GraphEdge::ExecutionFlow);
67 }
68}
69
70#[derive(Debug, Error)]
71pub enum GetStoreError {
72 #[error("No store found")]
73 NoStore,
74}
75
76#[derive(Debug, Clone, Copy)]
77pub struct StoreWrapper(pub NodeIndex);
78
79impl StoreWrapper {
80 pub fn inputs(self, graph: &Graph) -> impl Iterator<Item = StoreWrapper> + '_ {
82 graph
83 .edges_directed(self.0, Direction::Incoming)
84 .filter(|edge| matches!(edge.weight(), GraphEdge::DataFlow))
85 .map(|edge| StoreWrapper(edge.source()))
86 }
87 pub fn outputs(self, graph: &Graph) -> impl Iterator<Item = StoreWrapper> + '_ {
89 graph
90 .edges_directed(self.0, Direction::Outgoing)
91 .filter(|edge| matches!(edge.weight(), GraphEdge::DataFlow))
92 .map(|edge| StoreWrapper(edge.target()))
93 }
94
95 pub fn set_input(&self, graph: &mut Graph, store: Option<StoreWrapper>) {
98 let edges = graph
100 .edges_directed(self.0, Direction::Incoming)
101 .filter(|edge| matches!(edge.weight(), GraphEdge::DataFlow))
102 .map(|edge| edge.id())
103 .collect::<Vec<_>>();
104
105 for edge in edges {
106 graph.remove_edge(edge);
107 }
108
109 if let Some(store) = store {
111 graph.add_edge(store.0, self.0, GraphEdge::DataFlow);
112 }
113 }
114
115 pub fn add_output(&self, graph: &mut Graph, store: StoreWrapper) {
117 graph.add_edge(self.0, store.0, GraphEdge::DataFlow);
118 }
119
120 pub fn set_value(&self, graph: &mut Graph, value: Value) {
123 graph[self.0] = GraphNode::Store(value);
124 }
125}