dagrs/graph/
loop_subgraph.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4
5use crate::{EnvVar, InChannels, Node, NodeId, NodeName, NodeTable, OutChannels, Output};
6
7/// A special node type that represents a subgraph of nodes in a loop structure.
8///
9/// The LoopSubgraph is included in the main graph as a single node, but internally contains
10/// multiple nodes that will be executed repeatedly. The connection and execution of the loop is controlled
11/// by the parent graph rather than the LoopSubgraph itself.
12pub struct LoopSubgraph {
13    id: NodeId,
14    name: NodeName,
15    in_channels: InChannels,
16    out_channels: OutChannels,
17    // Inner nodes, contains the nodes that need to be executed in a loop
18    inner_nodes: Vec<Arc<Mutex<dyn Node>>>,
19}
20
21impl LoopSubgraph {
22    pub fn new(name: NodeName, node_table: &mut NodeTable) -> Self {
23        Self {
24            id: node_table.alloc_id_for(&name),
25            name,
26            in_channels: InChannels::default(),
27            out_channels: OutChannels::default(),
28            inner_nodes: Vec::new(),
29        }
30    }
31
32    /// Add a node to the subgraph
33    pub fn add_node(&mut self, node: impl Node + 'static) {
34        self.inner_nodes.push(Arc::new(Mutex::new(node)));
35    }
36}
37
38#[async_trait]
39impl Node for LoopSubgraph {
40    fn id(&self) -> NodeId {
41        self.id
42    }
43
44    fn name(&self) -> NodeName {
45        self.name.clone()
46    }
47
48    fn input_channels(&mut self) -> &mut InChannels {
49        &mut self.in_channels
50    }
51
52    fn output_channels(&mut self) -> &mut OutChannels {
53        &mut self.out_channels
54    }
55
56    fn loop_structure(&self) -> Option<Vec<Arc<Mutex<dyn Node>>>> {
57        Some(self.inner_nodes.clone())
58    }
59
60    async fn run(&mut self, _: Arc<EnvVar>) -> Output {
61        panic!("Loop subgraph is not executed directly, it will be executed by the parent graph.");
62    }
63}