Skip to main content

frp_engine/
graph.rs

1//! [`Graph`] — the live container that ties together blocks, edges, atoms,
2//! port values, scheduling, and execution.
3
4use std::collections::{HashMap, HashSet};
5use std::time::Duration;
6
7use futures::future::try_join_all;
8use frp_domain::{Atom, Block, HyperEdge};
9use frp_plexus::{AtomId, BlockId, EdgeId, GraphId, PortId, Value};
10
11use crate::error::EngineError;
12use crate::executor::Executor;
13use crate::scheduler::Scheduler;
14use crate::toposort::toposort;
15use crate::transform::{TransformRegistry, eval_transform};
16
17/// A live, mutable graph of blocks, edges, and atoms.
18///
19/// `Graph` owns all domain objects and drives the execution loop.  Typical
20/// usage:
21/// 1. Create via `Graph::new`.
22/// 2. Add blocks / edges / atoms.
23/// 3. Call `set_port_value` to inject inputs.
24/// 4. Call `run_pending` to flush the scheduler queue.
25/// 5. Read output values via `get_port_value`.
26pub struct Graph {
27    /// Unique id for this graph.
28    pub id: GraphId,
29    /// All blocks keyed by id.
30    pub blocks: HashMap<BlockId, Block>,
31    /// All edges keyed by id.
32    pub edges: HashMap<EdgeId, HyperEdge>,
33    /// All atoms keyed by id.
34    pub atoms: HashMap<AtomId, Atom>,
35    /// Current port values.
36    pub port_values: HashMap<PortId, Value>,
37    /// Edges in topological execution order (rebuilt on structural changes).
38    /// Delay edges are excluded — they are tracked separately.
39    sorted_edges: Vec<EdgeId>,
40    /// IDs of edges marked `delay: true`.
41    delay_edge_ids: HashSet<EdgeId>,
42    /// Pending output values from delay edges; flushed into `port_values` at
43    /// the start of the next `run_pending` call.
44    delay_buffer: HashMap<PortId, Value>,
45    scheduler: Scheduler,
46    executor: Executor,
47}
48
49impl Graph {
50    /// Create an empty graph with the given id and transform registry.
51    pub fn new(id: GraphId, registry: TransformRegistry) -> Self {
52        Graph {
53            id,
54            blocks: HashMap::new(),
55            edges: HashMap::new(),
56            atoms: HashMap::new(),
57            port_values: HashMap::new(),
58            sorted_edges: Vec::new(),
59            delay_edge_ids: HashSet::new(),
60            delay_buffer: HashMap::new(),
61            scheduler: Scheduler::new(),
62            executor: Executor::new(registry),
63        }
64    }
65
66    // ── structural mutations ──────────────────────────────────────────────
67
68    /// Add a block to the graph.
69    pub fn add_block(&mut self, block: Block) {
70        self.blocks.insert(block.id, block);
71    }
72
73    /// Add an edge to the graph and register it with the scheduler.
74    ///
75    /// Delay edges (`edge.delay == true`) are stored in `delay_edge_ids` and
76    /// excluded from the topological sort, allowing them to participate in
77    /// feedback cycles without triggering `CycleDetected`.
78    pub fn add_edge(&mut self, edge: HyperEdge) -> Result<(), EngineError> {
79        self.scheduler.register(&edge);
80        if edge.delay {
81            self.delay_edge_ids.insert(edge.id);
82        }
83        self.edges.insert(edge.id, edge);
84        self.rebuild_sort()
85    }
86
87    /// Remove a block by id (no-op if not present).
88    pub fn remove_block(&mut self, id: BlockId) {
89        self.blocks.remove(&id);
90    }
91
92    /// Remove an edge by id and rebuild the topological sort.
93    pub fn remove_edge(&mut self, id: EdgeId) -> Result<(), EngineError> {
94        self.edges.remove(&id);
95        self.delay_edge_ids.remove(&id);
96        self.rebuild_sort()
97    }
98
99    /// Add an atom to the graph.
100    pub fn add_atom(&mut self, atom: Atom) {
101        self.atoms.insert(atom.id, atom);
102    }
103
104    // ── port value access ─────────────────────────────────────────────────
105
106    /// Write a port value and notify the scheduler.
107    pub fn set_port_value(&mut self, port: PortId, value: Value) {
108        self.port_values.insert(port, value);
109        self.scheduler.notify_change(port);
110    }
111
112    /// Read a port value (returns `None` if not set).
113    pub fn get_port_value(&self, port: PortId) -> Option<&Value> {
114        self.port_values.get(&port)
115    }
116
117    // ── execution ─────────────────────────────────────────────────────────
118
119    /// Advance the tick-based scheduler by `delta` and run any newly pending edges.
120    pub async fn tick(&mut self, delta: Duration) -> Result<(), EngineError> {
121        self.scheduler.tick(delta);
122        self.run_pending().await
123    }
124
125    /// Fire a named event and run any newly pending edges.
126    pub async fn fire_event(&mut self, name: &str) -> Result<(), EngineError> {
127        self.scheduler.fire_event(name);
128        self.run_pending().await
129    }
130
131    /// Drain the scheduler queue and execute pending edges in parallel waves.
132    ///
133    /// **Delay buffer flush:** at the start of each call the delay buffer from
134    /// the previous tick is applied to `port_values` and its ports are
135    /// notified, so their downstream normal edges are included in this tick's
136    /// execution.
137    ///
138    /// **Normal edges** are grouped into independent waves by
139    /// [`compute_levels`](Self::compute_levels) and executed concurrently
140    /// within each wave via [`try_join_all`].
141    ///
142    /// **Delay edges** run after all normal waves.  Their outputs are written
143    /// into the delay buffer rather than `port_values`, deferring the effect
144    /// to the next tick.
145    pub async fn run_pending(&mut self) -> Result<(), EngineError> {
146        // ── 1. Flush delay buffer from previous tick ──────────────────────
147        if !self.delay_buffer.is_empty() {
148            let flushed: Vec<(PortId, Value)> = self.delay_buffer.drain().collect();
149            for (port, val) in flushed {
150                self.port_values.insert(port, val);
151                self.scheduler.notify_change(port);
152            }
153        }
154
155        // ── 2. Drain scheduler (includes edges triggered by the flush) ────
156        let pending = self.scheduler.drain_pending();
157        if pending.is_empty() {
158            return Ok(());
159        }
160
161        let pending_set: HashSet<EdgeId> = pending.into_iter().collect();
162
163        // ── 3. Partition into normal vs delay ─────────────────────────────
164        let normal_set: HashSet<EdgeId> = pending_set
165            .iter()
166            .copied()
167            .filter(|id| !self.delay_edge_ids.contains(id))
168            .collect();
169        let delay_set: HashSet<EdgeId> = pending_set
170            .iter()
171            .copied()
172            .filter(|id| self.delay_edge_ids.contains(id))
173            .collect();
174
175        // ── 4. Execute normal waves in parallel ───────────────────────────
176        if !normal_set.is_empty() {
177            let waves = Self::compute_levels(&normal_set, &self.sorted_edges, &self.edges);
178
179            for wave in waves {
180                let tasks: Vec<(HyperEdge, Vec<Value>)> = wave
181                    .iter()
182                    .map(|&eid| {
183                        let edge = self.edges[&eid].clone();
184                        let inputs: Vec<Value> = edge
185                            .sources
186                            .iter()
187                            .map(|pid| self.port_values.get(pid).cloned().unwrap_or(Value::Null))
188                            .collect();
189                        (edge, inputs)
190                    })
191                    .collect();
192
193                let futures_iter = tasks
194                    .iter()
195                    .map(|(edge, inputs)| eval_transform(&edge.transform, inputs.clone(), &self.executor.registry));
196                let results: Vec<Value> = try_join_all(futures_iter).await?;
197
198                for ((edge, _), result) in tasks.iter().zip(results.iter()) {
199                    for &target in &edge.targets {
200                        self.port_values.insert(target, result.clone());
201                    }
202                }
203            }
204        }
205
206        // ── 5. Execute delay edges — buffer outputs for next tick ─────────
207        for &eid in &delay_set {
208            let edge = match self.edges.get(&eid) {
209                Some(e) => e.clone(),
210                None => continue,
211            };
212            let inputs: Vec<Value> = edge
213                .sources
214                .iter()
215                .map(|pid| self.port_values.get(pid).cloned().unwrap_or(Value::Null))
216                .collect();
217            let result = eval_transform(&edge.transform, inputs, &self.executor.registry).await?;
218            for &target in &edge.targets {
219                self.delay_buffer.insert(target, result.clone());
220            }
221        }
222
223        Ok(())
224    }
225
226    // ── helpers ───────────────────────────────────────────────────────────
227
228    /// Group `pending_set` edges into independent waves (levels) based on their
229    /// inter-dependencies within the pending set.
230    ///
231    /// Edges with no pending predecessors are assigned level 0 and can run in
232    /// parallel.  An edge whose pending predecessor has level *k* gets level
233    /// *k + 1*.  Non-pending predecessors are ignored because their outputs are
234    /// already settled in `port_values` before `run_pending` is called.
235    fn compute_levels(
236        pending_set: &HashSet<EdgeId>,
237        sorted_edges: &[EdgeId],
238        edges: &HashMap<EdgeId, HyperEdge>,
239    ) -> Vec<Vec<EdgeId>> {
240        // Map: output port → pending edge that produces it.
241        let mut pending_producers: HashMap<PortId, EdgeId> = HashMap::new();
242        for &eid in pending_set {
243            if let Some(edge) = edges.get(&eid) {
244                for &port in &edge.targets {
245                    pending_producers.insert(port, eid);
246                }
247            }
248        }
249
250        let mut level_of: HashMap<EdgeId, usize> = HashMap::new();
251        let mut max_level = 0usize;
252
253        // Walk in topo order so predecessors are always assigned before successors.
254        for &eid in sorted_edges {
255            if !pending_set.contains(&eid) {
256                continue;
257            }
258            let edge = match edges.get(&eid) {
259                Some(e) => e,
260                None => continue,
261            };
262            let level = edge
263                .sources
264                .iter()
265                .filter_map(|port| pending_producers.get(port))
266                .filter_map(|pred| level_of.get(pred))
267                .copied()
268                .max()
269                .map_or(0, |l| l + 1);
270            level_of.insert(eid, level);
271            if level > max_level {
272                max_level = level;
273            }
274        }
275
276        let mut waves: Vec<Vec<EdgeId>> = vec![Vec::new(); max_level + 1];
277        // Re-walk in topo order so edges within a wave are also topo-ordered
278        // (makes behaviour deterministic and easier to reason about).
279        for &eid in sorted_edges {
280            if let Some(&level) = level_of.get(&eid) {
281                waves[level].push(eid);
282            }
283        }
284
285        waves
286    }
287
288    fn rebuild_sort(&mut self) -> Result<(), EngineError> {
289        // Delay edges are excluded: they may form cycles and must not be
290        // fed to Kahn's algorithm.
291        let normal_edges: Vec<HyperEdge> = self
292            .edges
293            .values()
294            .filter(|e| !e.delay)
295            .cloned()
296            .collect();
297        self.sorted_edges = toposort(&normal_edges)?;
298        Ok(())
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use frp_domain::{
306        Atom, AtomKind, AtomMeta, Block, BlockSchema, EdgeSchedule, EdgeTransform, HyperEdge,
307        Meta,
308    };
309    use frp_plexus::{
310        AtomId, BlockId, EdgeId, GraphId, LayerTag, PortId, Value,
311    };
312
313    fn simple_edge(id: u64, src: u64, tgt: u64) -> HyperEdge {
314        HyperEdge::new(
315            EdgeId::new(id),
316            vec![PortId::new(src)],
317            vec![PortId::new(tgt)],
318            EdgeTransform::PassThrough,
319            EdgeSchedule::OnChange,
320        )
321    }
322
323    fn simple_block(id: u64) -> Block {
324        Block {
325            id: BlockId::new(id),
326            schema: BlockSchema::new(vec![], vec![]),
327            atoms: vec![],
328            meta: Meta::default(),
329        }
330    }
331
332    fn simple_atom(id: u64) -> Atom {
333        Atom::new(
334            AtomId::new(id),
335            AtomKind::Transform,
336            AtomMeta::new("test".to_string(), LayerTag::Core),
337        )
338    }
339
340    #[test]
341    fn add_and_remove_block() {
342        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
343        g.add_block(simple_block(10));
344        assert!(g.blocks.contains_key(&BlockId::new(10)));
345        g.remove_block(BlockId::new(10));
346        assert!(!g.blocks.contains_key(&BlockId::new(10)));
347    }
348
349    #[test]
350    fn add_edge_sorts_topologically() {
351        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
352        // edge 2 depends on port 5 which is written by edge 1
353        g.add_edge(HyperEdge::new(
354            EdgeId::new(2),
355            vec![PortId::new(5)],
356            vec![PortId::new(6)],
357            EdgeTransform::PassThrough,
358            EdgeSchedule::OnChange,
359        ))
360        .unwrap();
361        g.add_edge(HyperEdge::new(
362            EdgeId::new(1),
363            vec![],
364            vec![PortId::new(5)],
365            EdgeTransform::PassThrough,
366            EdgeSchedule::OnChange,
367        ))
368        .unwrap();
369
370        let pos_1 = g.sorted_edges.iter().position(|&id| id == EdgeId::new(1)).unwrap();
371        let pos_2 = g.sorted_edges.iter().position(|&id| id == EdgeId::new(2)).unwrap();
372        assert!(pos_1 < pos_2);
373    }
374
375    #[test]
376    fn set_and_get_port_value() {
377        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
378        g.set_port_value(PortId::new(1), Value::Int(99));
379        assert_eq!(g.get_port_value(PortId::new(1)), Some(&Value::Int(99)));
380        assert_eq!(g.get_port_value(PortId::new(2)), None);
381    }
382
383    #[tokio::test]
384    async fn run_pending_propagates_value() {
385        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
386        g.add_edge(simple_edge(1, 10, 20)).unwrap();
387
388        // set_port_value enqueues edge 1 (OnChange)
389        g.set_port_value(PortId::new(10), Value::Int(7));
390        g.run_pending().await.unwrap();
391
392        assert_eq!(g.get_port_value(PortId::new(20)), Some(&Value::Int(7)));
393    }
394
395    #[tokio::test]
396    async fn tick_fires_on_tick_edge() {
397        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
398        g.add_edge(HyperEdge::new(
399            EdgeId::new(1),
400            vec![PortId::new(1)],
401            vec![PortId::new(2)],
402            EdgeTransform::PassThrough,
403            EdgeSchedule::OnTick(Duration::from_millis(100)),
404        ))
405        .unwrap();
406        g.port_values.insert(PortId::new(1), Value::Bool(true));
407
408        g.tick(Duration::from_millis(150)).await.unwrap();
409        assert_eq!(g.get_port_value(PortId::new(2)), Some(&Value::Bool(true)));
410    }
411
412    #[tokio::test]
413    async fn fire_event_triggers_edge() {
414        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
415        g.add_edge(HyperEdge::new(
416            EdgeId::new(1),
417            vec![PortId::new(1)],
418            vec![PortId::new(2)],
419            EdgeTransform::PassThrough,
420            EdgeSchedule::OnEvent("ping".to_string()),
421        ))
422        .unwrap();
423        g.port_values.insert(PortId::new(1), Value::Int(42));
424
425        g.fire_event("ping").await.unwrap();
426        assert_eq!(g.get_port_value(PortId::new(2)), Some(&Value::Int(42)));
427    }
428
429    #[test]
430    fn cycle_in_edges_returns_error() {
431        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
432        g.add_edge(HyperEdge::new(
433            EdgeId::new(1),
434            vec![PortId::new(2)],
435            vec![PortId::new(1)],
436            EdgeTransform::PassThrough,
437            EdgeSchedule::OnChange,
438        ))
439        .unwrap();
440        let err = g
441            .add_edge(HyperEdge::new(
442                EdgeId::new(2),
443                vec![PortId::new(1)],
444                vec![PortId::new(2)],
445                EdgeTransform::PassThrough,
446                EdgeSchedule::OnChange,
447            ))
448            .unwrap_err();
449        assert!(matches!(err, EngineError::CycleDetected));
450    }
451
452    #[test]
453    fn add_atom_stored() {
454        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
455        g.add_atom(simple_atom(5));
456        assert!(g.atoms.contains_key(&AtomId::new(5)));
457    }
458
459    /// A delay edge's output must not appear in `port_values` until the
460    /// *second* `run_pending` call (one tick later).
461    #[tokio::test]
462    async fn delay_edge_output_deferred_one_tick() {
463        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
464
465        // Delay edge: port 1 → port 2.
466        g.add_edge(
467            HyperEdge::new(
468                EdgeId::new(1),
469                vec![PortId::new(1)],
470                vec![PortId::new(2)],
471                EdgeTransform::PassThrough,
472                EdgeSchedule::OnChange,
473            )
474            .with_delay(),
475        )
476        .unwrap();
477
478        g.set_port_value(PortId::new(1), Value::Int(99));
479
480        // Tick 1: delay edge fires, result goes to buffer, NOT port_values.
481        g.run_pending().await.unwrap();
482        assert_eq!(g.get_port_value(PortId::new(2)), None,
483            "delay edge output must not appear until the next tick");
484
485        // Tick 2: buffer is flushed at the start of run_pending.
486        // No new pending edges, but the flush itself writes port 2.
487        g.run_pending().await.unwrap();
488        assert_eq!(g.get_port_value(PortId::new(2)), Some(&Value::Int(99)));
489    }
490
491    /// A feedback cycle (normal edge A→B, delay edge B→A) must not produce
492    /// `CycleDetected` and must propagate values correctly across ticks.
493    #[tokio::test]
494    async fn delay_edge_allows_feedback_cycle() {
495        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
496
497        // Normal edge: port 10 → port 20.
498        g.add_edge(HyperEdge::new(
499            EdgeId::new(1),
500            vec![PortId::new(10)],
501            vec![PortId::new(20)],
502            EdgeTransform::PassThrough,
503            EdgeSchedule::OnChange,
504        ))
505        .unwrap();
506
507        // Delay feedback edge: port 20 → port 10 (completes the cycle).
508        // Both add_edge calls must succeed — no CycleDetected.
509        g.add_edge(
510            HyperEdge::new(
511                EdgeId::new(2),
512                vec![PortId::new(20)],
513                vec![PortId::new(10)],
514                EdgeTransform::PassThrough,
515                EdgeSchedule::OnChange,
516            )
517            .with_delay(),
518        )
519        .unwrap();
520
521        // Tick 1: inject 5 into port 10.
522        // Normal edge fires → port 20 = 5.
523        // Delay edge fires → delay_buffer[port 10] = 5.
524        g.set_port_value(PortId::new(10), Value::Int(5));
525        g.run_pending().await.unwrap();
526        assert_eq!(g.get_port_value(PortId::new(20)), Some(&Value::Int(5)));
527        // Port 10 is still the externally-set value, not yet overwritten.
528        assert_eq!(g.get_port_value(PortId::new(10)), Some(&Value::Int(5)));
529
530        // Tick 2: flush writes port 10 = 5 (feedback), which re-triggers
531        // edge 1. Port 20 stays = 5.
532        g.run_pending().await.unwrap();
533        assert_eq!(g.get_port_value(PortId::new(10)), Some(&Value::Int(5)));
534        assert_eq!(g.get_port_value(PortId::new(20)), Some(&Value::Int(5)));
535    }
536
537    /// Three edges with completely disjoint ports — all land in wave 0 and
538    /// execute concurrently.  This test validates correctness of the parallel
539    /// path: every output must carry the right value even when transforms run
540    /// simultaneously.
541    #[tokio::test]
542    async fn parallel_independent_edges_all_execute() {
543        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
544
545        // Edge A: port 10 → port 20
546        g.add_edge(HyperEdge::new(
547            EdgeId::new(1),
548            vec![PortId::new(10)],
549            vec![PortId::new(20)],
550            EdgeTransform::PassThrough,
551            EdgeSchedule::OnChange,
552        ))
553        .unwrap();
554
555        // Edge B: port 30 → port 40
556        g.add_edge(HyperEdge::new(
557            EdgeId::new(2),
558            vec![PortId::new(30)],
559            vec![PortId::new(40)],
560            EdgeTransform::PassThrough,
561            EdgeSchedule::OnChange,
562        ))
563        .unwrap();
564
565        // Edge C: port 50 → port 60
566        g.add_edge(HyperEdge::new(
567            EdgeId::new(3),
568            vec![PortId::new(50)],
569            vec![PortId::new(60)],
570            EdgeTransform::PassThrough,
571            EdgeSchedule::OnChange,
572        ))
573        .unwrap();
574
575        g.set_port_value(PortId::new(10), Value::Int(1));
576        g.set_port_value(PortId::new(30), Value::Int(2));
577        g.set_port_value(PortId::new(50), Value::Int(3));
578
579        g.run_pending().await.unwrap();
580
581        assert_eq!(g.get_port_value(PortId::new(20)), Some(&Value::Int(1)));
582        assert_eq!(g.get_port_value(PortId::new(40)), Some(&Value::Int(2)));
583        assert_eq!(g.get_port_value(PortId::new(60)), Some(&Value::Int(3)));
584    }
585
586    /// Two chained edges (A writes port 5, B reads port 5) must execute in
587    /// separate waves even when both are pending at the same time.
588    #[tokio::test]
589    async fn chained_edges_execute_in_order() {
590        let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
591
592        // Edge A: port 1 → port 5
593        g.add_edge(HyperEdge::new(
594            EdgeId::new(1),
595            vec![PortId::new(1)],
596            vec![PortId::new(5)],
597            EdgeTransform::PassThrough,
598            EdgeSchedule::OnChange,
599        ))
600        .unwrap();
601
602        // Edge B: port 5 → port 9
603        g.add_edge(HyperEdge::new(
604            EdgeId::new(2),
605            vec![PortId::new(5)],
606            vec![PortId::new(9)],
607            EdgeTransform::PassThrough,
608            EdgeSchedule::OnChange,
609        ))
610        .unwrap();
611
612        g.set_port_value(PortId::new(1), Value::Int(42));
613        // Manually mark edge 2 as pending too so both are drained together.
614        g.scheduler.notify_change(PortId::new(5));
615
616        g.run_pending().await.unwrap();
617
618        // After wave 0 (edge A) writes port 5 = 42, wave 1 (edge B) reads
619        // it and writes port 9 = 42.
620        assert_eq!(g.get_port_value(PortId::new(9)), Some(&Value::Int(42)));
621    }
622}