Skip to main content

jellyflow_runtime/runtime/lookups/
connections.rs

1use std::collections::HashMap;
2
3use super::{ConnectionLookupKey, ConnectionSide, HandleConnection, NodeGraphLookups};
4use jellyflow_core::core::{EdgeId, NodeId, PortId};
5
6impl NodeGraphLookups {
7    pub fn connections(
8        &self,
9        key: ConnectionLookupKey,
10    ) -> Option<&HashMap<EdgeId, HandleConnection>> {
11        self.connection_lookup.get(&key)
12    }
13
14    pub fn connections_for_node(&self, node: NodeId) -> Option<&HashMap<EdgeId, HandleConnection>> {
15        self.connections(ConnectionLookupKey::Node(node))
16    }
17
18    pub fn connections_for_node_side(
19        &self,
20        node: NodeId,
21        side: ConnectionSide,
22    ) -> Option<&HashMap<EdgeId, HandleConnection>> {
23        self.connections(ConnectionLookupKey::NodeSide { node, side })
24    }
25
26    pub fn connections_for_port(
27        &self,
28        node: NodeId,
29        side: ConnectionSide,
30        port: PortId,
31    ) -> Option<&HashMap<EdgeId, HandleConnection>> {
32        self.connections(ConnectionLookupKey::NodeSidePort { node, side, port })
33    }
34
35    pub fn connection_for_edge(&self, edge: EdgeId) -> Option<HandleConnection> {
36        let entry = *self.edge_lookup.get(&edge)?;
37        Some(HandleConnection::from_edge_lookup(edge, entry))
38    }
39
40    fn add_connection(&mut self, key: ConnectionLookupKey, conn: HandleConnection) {
41        self.connection_lookup
42            .entry(key)
43            .or_default()
44            .insert(conn.edge, conn);
45    }
46
47    fn remove_connection(&mut self, key: ConnectionLookupKey, edge: EdgeId) {
48        let Some(map) = self.connection_lookup.get_mut(&key) else {
49            return;
50        };
51        map.remove(&edge);
52        if map.is_empty() {
53            self.connection_lookup.remove(&key);
54        }
55    }
56
57    pub(super) fn add_edge_connection(&mut self, entry: HandleConnection) {
58        for key in entry.lookup_keys() {
59            self.add_connection(key, entry);
60        }
61    }
62
63    pub(super) fn remove_edge_connection(&mut self, entry: HandleConnection) {
64        for key in entry.lookup_keys() {
65            self.remove_connection(key, entry.edge);
66        }
67    }
68
69    pub(super) fn slow_remove_edge_from_connection_lookup(&mut self, edge: EdgeId) {
70        let mut empty: Vec<ConnectionLookupKey> = Vec::new();
71        for (key, map) in &mut self.connection_lookup {
72            map.remove(&edge);
73            if map.is_empty() {
74                empty.push(*key);
75            }
76        }
77        for k in empty {
78            self.connection_lookup.remove(&k);
79        }
80    }
81
82    pub(super) fn connection_from_edge_lookup(&self, edge: EdgeId) -> Option<HandleConnection> {
83        self.connection_for_edge(edge)
84    }
85}