jellyflow_runtime/runtime/lookups/
connections.rs1use std::collections::HashMap;
2
3use super::{ConnectionLookupKey, ConnectionSide, HandleConnection, NodeGraphLookups};
4use jellyflow_core::core::{EdgeId, NodeId, PortId};
5
6impl NodeGraphLookups {
7 pub fn connections(
8 &self,
9 key: ConnectionLookupKey,
10 ) -> Option<&HashMap<EdgeId, HandleConnection>> {
11 self.connection_lookup.get(&key)
12 }
13
14 pub fn connections_for_node(&self, node: NodeId) -> Option<&HashMap<EdgeId, HandleConnection>> {
15 self.connections(ConnectionLookupKey::Node(node))
16 }
17
18 pub fn connections_for_node_side(
19 &self,
20 node: NodeId,
21 side: ConnectionSide,
22 ) -> Option<&HashMap<EdgeId, HandleConnection>> {
23 self.connections(ConnectionLookupKey::NodeSide { node, side })
24 }
25
26 pub fn connections_for_port(
27 &self,
28 node: NodeId,
29 side: ConnectionSide,
30 port: PortId,
31 ) -> Option<&HashMap<EdgeId, HandleConnection>> {
32 self.connections(ConnectionLookupKey::NodeSidePort { node, side, port })
33 }
34
35 pub fn connection_for_edge(&self, edge: EdgeId) -> Option<HandleConnection> {
36 let entry = *self.edge_lookup.get(&edge)?;
37 Some(HandleConnection::from_edge_lookup(edge, entry))
38 }
39
40 fn add_connection(&mut self, key: ConnectionLookupKey, conn: HandleConnection) {
41 self.connection_lookup
42 .entry(key)
43 .or_default()
44 .insert(conn.edge, conn);
45 }
46
47 fn remove_connection(&mut self, key: ConnectionLookupKey, edge: EdgeId) {
48 let Some(map) = self.connection_lookup.get_mut(&key) else {
49 return;
50 };
51 map.remove(&edge);
52 if map.is_empty() {
53 self.connection_lookup.remove(&key);
54 }
55 }
56
57 pub(super) fn add_edge_connection(&mut self, entry: HandleConnection) {
58 for key in entry.lookup_keys() {
59 self.add_connection(key, entry);
60 }
61 }
62
63 pub(super) fn remove_edge_connection(&mut self, entry: HandleConnection) {
64 for key in entry.lookup_keys() {
65 self.remove_connection(key, entry.edge);
66 }
67 }
68
69 pub(super) fn slow_remove_edge_from_connection_lookup(&mut self, edge: EdgeId) {
70 let mut empty: Vec<ConnectionLookupKey> = Vec::new();
71 for (key, map) in &mut self.connection_lookup {
72 map.remove(&edge);
73 if map.is_empty() {
74 empty.push(*key);
75 }
76 }
77 for k in empty {
78 self.connection_lookup.remove(&k);
79 }
80 }
81
82 pub(super) fn connection_from_edge_lookup(&self, edge: EdgeId) -> Option<HandleConnection> {
83 self.connection_for_edge(edge)
84 }
85}