Skip to main content

frp_engine/
executor.rs

1//! Edge executor — collects input values, runs the transform, and writes outputs.
2
3use std::collections::HashMap;
4
5use frp_domain::HyperEdge;
6use frp_plexus::{PortId, Value};
7
8use crate::error::EngineError;
9use crate::transform::{TransformRegistry, eval_transform};
10
11/// Stateless executor that drives a single [`HyperEdge`].
12pub struct Executor {
13    pub registry: TransformRegistry,
14}
15
16impl Executor {
17    /// Create an executor backed by the given transform registry.
18    pub fn new(registry: TransformRegistry) -> Self {
19        Executor { registry }
20    }
21
22    /// Execute one edge.
23    ///
24    /// 1. Collect input values from `port_values[sources]`.  Missing source
25    ///    ports resolve to `Value::Null`.
26    /// 2. Evaluate the edge transform (may be async).
27    /// 3. Write the result into `port_values` for every target port.
28    pub async fn execute(
29        &self,
30        edge: &HyperEdge,
31        port_values: &mut HashMap<PortId, Value>,
32    ) -> Result<(), EngineError> {
33        let inputs: Vec<Value> = edge
34            .sources
35            .iter()
36            .map(|pid| port_values.get(pid).cloned().unwrap_or(Value::Null))
37            .collect();
38
39        let result = eval_transform(&edge.transform, inputs, &self.registry).await?;
40
41        for &target in &edge.targets {
42            port_values.insert(target, result.clone());
43        }
44
45        Ok(())
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52    use frp_domain::{EdgeSchedule, EdgeTransform, HyperEdge};
53    use frp_plexus::{EdgeId, PortId, Value};
54    use std::sync::Arc;
55
56    fn make_edge(
57        id: u64,
58        sources: &[u64],
59        targets: &[u64],
60        transform: EdgeTransform,
61    ) -> HyperEdge {
62        HyperEdge::new(
63            EdgeId::new(id),
64            sources.iter().map(|&p| PortId::new(p)).collect(),
65            targets.iter().map(|&p| PortId::new(p)).collect(),
66            transform,
67            EdgeSchedule::OnChange,
68        )
69    }
70
71    #[tokio::test]
72    async fn passthrough_copies_value() {
73        let exec = Executor::new(TransformRegistry::new());
74        let edge = make_edge(1, &[1], &[2], EdgeTransform::PassThrough);
75        let mut ports = HashMap::new();
76        ports.insert(PortId::new(1), Value::Int(42));
77
78        exec.execute(&edge, &mut ports).await.unwrap();
79
80        assert_eq!(ports[&PortId::new(2)], Value::Int(42));
81    }
82
83    #[tokio::test]
84    async fn missing_source_resolves_to_null() {
85        let exec = Executor::new(TransformRegistry::new());
86        let edge = make_edge(1, &[99], &[2], EdgeTransform::PassThrough);
87        let mut ports = HashMap::new();
88
89        exec.execute(&edge, &mut ports).await.unwrap();
90
91        assert_eq!(ports[&PortId::new(2)], Value::Null);
92    }
93
94    #[tokio::test]
95    async fn named_transform_applied() {
96        let mut reg = TransformRegistry::new();
97        reg.register("negate", |inputs| {
98            if let Some(Value::Int(n)) = inputs.first() {
99                Value::Int(-n)
100            } else {
101                Value::Null
102            }
103        });
104        let exec = Executor::new(reg);
105        let edge = make_edge(1, &[1], &[2], EdgeTransform::Named("negate".to_string()));
106        let mut ports = HashMap::new();
107        ports.insert(PortId::new(1), Value::Int(7));
108
109        exec.execute(&edge, &mut ports).await.unwrap();
110
111        assert_eq!(ports[&PortId::new(2)], Value::Int(-7));
112    }
113
114    #[tokio::test]
115    async fn multiple_targets_all_written() {
116        let exec = Executor::new(TransformRegistry::new());
117        let edge = make_edge(1, &[1], &[2, 3, 4], EdgeTransform::PassThrough);
118        let mut ports = HashMap::new();
119        ports.insert(PortId::new(1), Value::Bool(true));
120
121        exec.execute(&edge, &mut ports).await.unwrap();
122
123        assert_eq!(ports[&PortId::new(2)], Value::Bool(true));
124        assert_eq!(ports[&PortId::new(3)], Value::Bool(true));
125        assert_eq!(ports[&PortId::new(4)], Value::Bool(true));
126    }
127
128    #[tokio::test]
129    async fn inline_transform_applied() {
130        let exec = Executor::new(TransformRegistry::new());
131        let t = EdgeTransform::Inline(Arc::new(|_| Value::Int(100)));
132        let edge = make_edge(1, &[], &[5], t);
133        let mut ports = HashMap::new();
134
135        exec.execute(&edge, &mut ports).await.unwrap();
136
137        assert_eq!(ports[&PortId::new(5)], Value::Int(100));
138    }
139}