1use std::collections::{HashMap, HashSet};
5use std::time::Duration;
6
7use futures::future::try_join_all;
8use frp_domain::{Atom, Block, HyperEdge};
9use frp_plexus::{AtomId, BlockId, EdgeId, GraphId, PortId, Value};
10
11use crate::error::EngineError;
12use crate::executor::Executor;
13use crate::scheduler::Scheduler;
14use crate::toposort::toposort;
15use crate::transform::{TransformRegistry, eval_transform};
16
17pub struct Graph {
27 pub id: GraphId,
29 pub blocks: HashMap<BlockId, Block>,
31 pub edges: HashMap<EdgeId, HyperEdge>,
33 pub atoms: HashMap<AtomId, Atom>,
35 pub port_values: HashMap<PortId, Value>,
37 sorted_edges: Vec<EdgeId>,
40 delay_edge_ids: HashSet<EdgeId>,
42 delay_buffer: HashMap<PortId, Value>,
45 scheduler: Scheduler,
46 executor: Executor,
47}
48
49impl Graph {
50 pub fn new(id: GraphId, registry: TransformRegistry) -> Self {
52 Graph {
53 id,
54 blocks: HashMap::new(),
55 edges: HashMap::new(),
56 atoms: HashMap::new(),
57 port_values: HashMap::new(),
58 sorted_edges: Vec::new(),
59 delay_edge_ids: HashSet::new(),
60 delay_buffer: HashMap::new(),
61 scheduler: Scheduler::new(),
62 executor: Executor::new(registry),
63 }
64 }
65
66 pub fn add_block(&mut self, block: Block) {
70 self.blocks.insert(block.id, block);
71 }
72
73 pub fn add_edge(&mut self, edge: HyperEdge) -> Result<(), EngineError> {
79 self.scheduler.register(&edge);
80 if edge.delay {
81 self.delay_edge_ids.insert(edge.id);
82 }
83 self.edges.insert(edge.id, edge);
84 self.rebuild_sort()
85 }
86
87 pub fn remove_block(&mut self, id: BlockId) {
89 self.blocks.remove(&id);
90 }
91
92 pub fn remove_edge(&mut self, id: EdgeId) -> Result<(), EngineError> {
94 self.edges.remove(&id);
95 self.delay_edge_ids.remove(&id);
96 self.rebuild_sort()
97 }
98
99 pub fn add_atom(&mut self, atom: Atom) {
101 self.atoms.insert(atom.id, atom);
102 }
103
104 pub fn set_port_value(&mut self, port: PortId, value: Value) {
108 self.port_values.insert(port, value);
109 self.scheduler.notify_change(port);
110 }
111
112 pub fn get_port_value(&self, port: PortId) -> Option<&Value> {
114 self.port_values.get(&port)
115 }
116
117 pub async fn tick(&mut self, delta: Duration) -> Result<(), EngineError> {
121 self.scheduler.tick(delta);
122 self.run_pending().await
123 }
124
125 pub async fn fire_event(&mut self, name: &str) -> Result<(), EngineError> {
127 self.scheduler.fire_event(name);
128 self.run_pending().await
129 }
130
131 pub async fn run_pending(&mut self) -> Result<(), EngineError> {
146 if !self.delay_buffer.is_empty() {
148 let flushed: Vec<(PortId, Value)> = self.delay_buffer.drain().collect();
149 for (port, val) in flushed {
150 self.port_values.insert(port, val);
151 self.scheduler.notify_change(port);
152 }
153 }
154
155 let pending = self.scheduler.drain_pending();
157 if pending.is_empty() {
158 return Ok(());
159 }
160
161 let pending_set: HashSet<EdgeId> = pending.into_iter().collect();
162
163 let normal_set: HashSet<EdgeId> = pending_set
165 .iter()
166 .copied()
167 .filter(|id| !self.delay_edge_ids.contains(id))
168 .collect();
169 let delay_set: HashSet<EdgeId> = pending_set
170 .iter()
171 .copied()
172 .filter(|id| self.delay_edge_ids.contains(id))
173 .collect();
174
175 if !normal_set.is_empty() {
177 let waves = Self::compute_levels(&normal_set, &self.sorted_edges, &self.edges);
178
179 for wave in waves {
180 let tasks: Vec<(HyperEdge, Vec<Value>)> = wave
181 .iter()
182 .map(|&eid| {
183 let edge = self.edges[&eid].clone();
184 let inputs: Vec<Value> = edge
185 .sources
186 .iter()
187 .map(|pid| self.port_values.get(pid).cloned().unwrap_or(Value::Null))
188 .collect();
189 (edge, inputs)
190 })
191 .collect();
192
193 let futures_iter = tasks
194 .iter()
195 .map(|(edge, inputs)| eval_transform(&edge.transform, inputs.clone(), &self.executor.registry));
196 let results: Vec<Value> = try_join_all(futures_iter).await?;
197
198 for ((edge, _), result) in tasks.iter().zip(results.iter()) {
199 for &target in &edge.targets {
200 self.port_values.insert(target, result.clone());
201 }
202 }
203 }
204 }
205
206 for &eid in &delay_set {
208 let edge = match self.edges.get(&eid) {
209 Some(e) => e.clone(),
210 None => continue,
211 };
212 let inputs: Vec<Value> = edge
213 .sources
214 .iter()
215 .map(|pid| self.port_values.get(pid).cloned().unwrap_or(Value::Null))
216 .collect();
217 let result = eval_transform(&edge.transform, inputs, &self.executor.registry).await?;
218 for &target in &edge.targets {
219 self.delay_buffer.insert(target, result.clone());
220 }
221 }
222
223 Ok(())
224 }
225
226 fn compute_levels(
236 pending_set: &HashSet<EdgeId>,
237 sorted_edges: &[EdgeId],
238 edges: &HashMap<EdgeId, HyperEdge>,
239 ) -> Vec<Vec<EdgeId>> {
240 let mut pending_producers: HashMap<PortId, EdgeId> = HashMap::new();
242 for &eid in pending_set {
243 if let Some(edge) = edges.get(&eid) {
244 for &port in &edge.targets {
245 pending_producers.insert(port, eid);
246 }
247 }
248 }
249
250 let mut level_of: HashMap<EdgeId, usize> = HashMap::new();
251 let mut max_level = 0usize;
252
253 for &eid in sorted_edges {
255 if !pending_set.contains(&eid) {
256 continue;
257 }
258 let edge = match edges.get(&eid) {
259 Some(e) => e,
260 None => continue,
261 };
262 let level = edge
263 .sources
264 .iter()
265 .filter_map(|port| pending_producers.get(port))
266 .filter_map(|pred| level_of.get(pred))
267 .copied()
268 .max()
269 .map_or(0, |l| l + 1);
270 level_of.insert(eid, level);
271 if level > max_level {
272 max_level = level;
273 }
274 }
275
276 let mut waves: Vec<Vec<EdgeId>> = vec![Vec::new(); max_level + 1];
277 for &eid in sorted_edges {
280 if let Some(&level) = level_of.get(&eid) {
281 waves[level].push(eid);
282 }
283 }
284
285 waves
286 }
287
288 fn rebuild_sort(&mut self) -> Result<(), EngineError> {
289 let normal_edges: Vec<HyperEdge> = self
292 .edges
293 .values()
294 .filter(|e| !e.delay)
295 .cloned()
296 .collect();
297 self.sorted_edges = toposort(&normal_edges)?;
298 Ok(())
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use frp_domain::{
306 Atom, AtomKind, AtomMeta, Block, BlockSchema, EdgeSchedule, EdgeTransform, HyperEdge,
307 Meta,
308 };
309 use frp_plexus::{
310 AtomId, BlockId, EdgeId, GraphId, LayerTag, PortId, Value,
311 };
312
313 fn simple_edge(id: u64, src: u64, tgt: u64) -> HyperEdge {
314 HyperEdge::new(
315 EdgeId::new(id),
316 vec![PortId::new(src)],
317 vec![PortId::new(tgt)],
318 EdgeTransform::PassThrough,
319 EdgeSchedule::OnChange,
320 )
321 }
322
323 fn simple_block(id: u64) -> Block {
324 Block {
325 id: BlockId::new(id),
326 schema: BlockSchema::new(vec![], vec![]),
327 atoms: vec![],
328 meta: Meta::default(),
329 }
330 }
331
332 fn simple_atom(id: u64) -> Atom {
333 Atom::new(
334 AtomId::new(id),
335 AtomKind::Transform,
336 AtomMeta::new("test".to_string(), LayerTag::Core),
337 )
338 }
339
340 #[test]
341 fn add_and_remove_block() {
342 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
343 g.add_block(simple_block(10));
344 assert!(g.blocks.contains_key(&BlockId::new(10)));
345 g.remove_block(BlockId::new(10));
346 assert!(!g.blocks.contains_key(&BlockId::new(10)));
347 }
348
349 #[test]
350 fn add_edge_sorts_topologically() {
351 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
352 g.add_edge(HyperEdge::new(
354 EdgeId::new(2),
355 vec![PortId::new(5)],
356 vec![PortId::new(6)],
357 EdgeTransform::PassThrough,
358 EdgeSchedule::OnChange,
359 ))
360 .unwrap();
361 g.add_edge(HyperEdge::new(
362 EdgeId::new(1),
363 vec![],
364 vec![PortId::new(5)],
365 EdgeTransform::PassThrough,
366 EdgeSchedule::OnChange,
367 ))
368 .unwrap();
369
370 let pos_1 = g.sorted_edges.iter().position(|&id| id == EdgeId::new(1)).unwrap();
371 let pos_2 = g.sorted_edges.iter().position(|&id| id == EdgeId::new(2)).unwrap();
372 assert!(pos_1 < pos_2);
373 }
374
375 #[test]
376 fn set_and_get_port_value() {
377 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
378 g.set_port_value(PortId::new(1), Value::Int(99));
379 assert_eq!(g.get_port_value(PortId::new(1)), Some(&Value::Int(99)));
380 assert_eq!(g.get_port_value(PortId::new(2)), None);
381 }
382
383 #[tokio::test]
384 async fn run_pending_propagates_value() {
385 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
386 g.add_edge(simple_edge(1, 10, 20)).unwrap();
387
388 g.set_port_value(PortId::new(10), Value::Int(7));
390 g.run_pending().await.unwrap();
391
392 assert_eq!(g.get_port_value(PortId::new(20)), Some(&Value::Int(7)));
393 }
394
395 #[tokio::test]
396 async fn tick_fires_on_tick_edge() {
397 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
398 g.add_edge(HyperEdge::new(
399 EdgeId::new(1),
400 vec![PortId::new(1)],
401 vec![PortId::new(2)],
402 EdgeTransform::PassThrough,
403 EdgeSchedule::OnTick(Duration::from_millis(100)),
404 ))
405 .unwrap();
406 g.port_values.insert(PortId::new(1), Value::Bool(true));
407
408 g.tick(Duration::from_millis(150)).await.unwrap();
409 assert_eq!(g.get_port_value(PortId::new(2)), Some(&Value::Bool(true)));
410 }
411
412 #[tokio::test]
413 async fn fire_event_triggers_edge() {
414 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
415 g.add_edge(HyperEdge::new(
416 EdgeId::new(1),
417 vec![PortId::new(1)],
418 vec![PortId::new(2)],
419 EdgeTransform::PassThrough,
420 EdgeSchedule::OnEvent("ping".to_string()),
421 ))
422 .unwrap();
423 g.port_values.insert(PortId::new(1), Value::Int(42));
424
425 g.fire_event("ping").await.unwrap();
426 assert_eq!(g.get_port_value(PortId::new(2)), Some(&Value::Int(42)));
427 }
428
429 #[test]
430 fn cycle_in_edges_returns_error() {
431 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
432 g.add_edge(HyperEdge::new(
433 EdgeId::new(1),
434 vec![PortId::new(2)],
435 vec![PortId::new(1)],
436 EdgeTransform::PassThrough,
437 EdgeSchedule::OnChange,
438 ))
439 .unwrap();
440 let err = g
441 .add_edge(HyperEdge::new(
442 EdgeId::new(2),
443 vec![PortId::new(1)],
444 vec![PortId::new(2)],
445 EdgeTransform::PassThrough,
446 EdgeSchedule::OnChange,
447 ))
448 .unwrap_err();
449 assert!(matches!(err, EngineError::CycleDetected));
450 }
451
452 #[test]
453 fn add_atom_stored() {
454 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
455 g.add_atom(simple_atom(5));
456 assert!(g.atoms.contains_key(&AtomId::new(5)));
457 }
458
459 #[tokio::test]
462 async fn delay_edge_output_deferred_one_tick() {
463 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
464
465 g.add_edge(
467 HyperEdge::new(
468 EdgeId::new(1),
469 vec![PortId::new(1)],
470 vec![PortId::new(2)],
471 EdgeTransform::PassThrough,
472 EdgeSchedule::OnChange,
473 )
474 .with_delay(),
475 )
476 .unwrap();
477
478 g.set_port_value(PortId::new(1), Value::Int(99));
479
480 g.run_pending().await.unwrap();
482 assert_eq!(g.get_port_value(PortId::new(2)), None,
483 "delay edge output must not appear until the next tick");
484
485 g.run_pending().await.unwrap();
488 assert_eq!(g.get_port_value(PortId::new(2)), Some(&Value::Int(99)));
489 }
490
491 #[tokio::test]
494 async fn delay_edge_allows_feedback_cycle() {
495 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
496
497 g.add_edge(HyperEdge::new(
499 EdgeId::new(1),
500 vec![PortId::new(10)],
501 vec![PortId::new(20)],
502 EdgeTransform::PassThrough,
503 EdgeSchedule::OnChange,
504 ))
505 .unwrap();
506
507 g.add_edge(
510 HyperEdge::new(
511 EdgeId::new(2),
512 vec![PortId::new(20)],
513 vec![PortId::new(10)],
514 EdgeTransform::PassThrough,
515 EdgeSchedule::OnChange,
516 )
517 .with_delay(),
518 )
519 .unwrap();
520
521 g.set_port_value(PortId::new(10), Value::Int(5));
525 g.run_pending().await.unwrap();
526 assert_eq!(g.get_port_value(PortId::new(20)), Some(&Value::Int(5)));
527 assert_eq!(g.get_port_value(PortId::new(10)), Some(&Value::Int(5)));
529
530 g.run_pending().await.unwrap();
533 assert_eq!(g.get_port_value(PortId::new(10)), Some(&Value::Int(5)));
534 assert_eq!(g.get_port_value(PortId::new(20)), Some(&Value::Int(5)));
535 }
536
537 #[tokio::test]
542 async fn parallel_independent_edges_all_execute() {
543 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
544
545 g.add_edge(HyperEdge::new(
547 EdgeId::new(1),
548 vec![PortId::new(10)],
549 vec![PortId::new(20)],
550 EdgeTransform::PassThrough,
551 EdgeSchedule::OnChange,
552 ))
553 .unwrap();
554
555 g.add_edge(HyperEdge::new(
557 EdgeId::new(2),
558 vec![PortId::new(30)],
559 vec![PortId::new(40)],
560 EdgeTransform::PassThrough,
561 EdgeSchedule::OnChange,
562 ))
563 .unwrap();
564
565 g.add_edge(HyperEdge::new(
567 EdgeId::new(3),
568 vec![PortId::new(50)],
569 vec![PortId::new(60)],
570 EdgeTransform::PassThrough,
571 EdgeSchedule::OnChange,
572 ))
573 .unwrap();
574
575 g.set_port_value(PortId::new(10), Value::Int(1));
576 g.set_port_value(PortId::new(30), Value::Int(2));
577 g.set_port_value(PortId::new(50), Value::Int(3));
578
579 g.run_pending().await.unwrap();
580
581 assert_eq!(g.get_port_value(PortId::new(20)), Some(&Value::Int(1)));
582 assert_eq!(g.get_port_value(PortId::new(40)), Some(&Value::Int(2)));
583 assert_eq!(g.get_port_value(PortId::new(60)), Some(&Value::Int(3)));
584 }
585
586 #[tokio::test]
589 async fn chained_edges_execute_in_order() {
590 let mut g = Graph::new(GraphId::new(1), TransformRegistry::new());
591
592 g.add_edge(HyperEdge::new(
594 EdgeId::new(1),
595 vec![PortId::new(1)],
596 vec![PortId::new(5)],
597 EdgeTransform::PassThrough,
598 EdgeSchedule::OnChange,
599 ))
600 .unwrap();
601
602 g.add_edge(HyperEdge::new(
604 EdgeId::new(2),
605 vec![PortId::new(5)],
606 vec![PortId::new(9)],
607 EdgeTransform::PassThrough,
608 EdgeSchedule::OnChange,
609 ))
610 .unwrap();
611
612 g.set_port_value(PortId::new(1), Value::Int(42));
613 g.scheduler.notify_change(PortId::new(5));
615
616 g.run_pending().await.unwrap();
617
618 assert_eq!(g.get_port_value(PortId::new(9)), Some(&Value::Int(42)));
621 }
622}