jellyflow_runtime/runtime/connection/
connect.rs1use serde::{Deserialize, Serialize};
2
3use crate::rules::{ConnectPlan, Diagnostic, plan_connect_with_mode_and_policy};
4use crate::runtime::store::{DispatchError, DispatchOutcome, NodeGraphStore};
5use jellyflow_core::core::{EdgeId, PortId};
6use jellyflow_core::interaction::NodeGraphConnectionMode;
7use jellyflow_core::ops::{GraphOp, GraphTransaction};
8
9pub const CONNECT_EDGE_TRANSACTION_LABEL: &str = "connect edge";
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct ConnectEdgeRequest {
15 pub from: PortId,
16 pub to: PortId,
17 #[serde(default, skip_serializing_if = "Option::is_none")]
18 pub edge: Option<EdgeId>,
19 #[serde(default)]
20 pub mode: NodeGraphConnectionMode,
21}
22
23impl ConnectEdgeRequest {
24 pub fn new(from: PortId, to: PortId, mode: NodeGraphConnectionMode) -> Self {
25 Self {
26 from,
27 to,
28 edge: None,
29 mode,
30 }
31 }
32
33 pub fn with_edge_id(mut self, edge: EdgeId) -> Self {
34 self.edge = Some(edge);
35 self
36 }
37}
38
39#[derive(Debug, thiserror::Error)]
41pub enum ConnectEdgeError {
42 #[error("connect edge was rejected")]
43 Rejected { diagnostics: Vec<Diagnostic> },
44 #[error(transparent)]
45 Dispatch(#[from] DispatchError),
46}
47
48impl ConnectEdgeError {
49 pub fn diagnostics(&self) -> Option<&[Diagnostic]> {
50 match self {
51 Self::Rejected { diagnostics } => Some(diagnostics),
52 Self::Dispatch(_) => None,
53 }
54 }
55}
56
57pub fn connect_edge_transaction(plan: &ConnectPlan) -> Option<GraphTransaction> {
58 connect_edge_transaction_with_optional_edge_id(plan, None)
59}
60
61pub fn connect_edge_transaction_with_edge_id(
62 plan: &ConnectPlan,
63 edge: EdgeId,
64) -> Option<GraphTransaction> {
65 connect_edge_transaction_with_optional_edge_id(plan, Some(edge))
66}
67
68fn connect_edge_transaction_with_optional_edge_id(
69 plan: &ConnectPlan,
70 edge: Option<EdgeId>,
71) -> Option<GraphTransaction> {
72 if !plan.is_accept() || plan.ops().is_empty() {
73 return None;
74 }
75
76 Some(
77 GraphTransaction::from_ops(connect_edge_ops(plan.ops().iter().cloned(), edge))
78 .with_label(CONNECT_EDGE_TRANSACTION_LABEL),
79 )
80}
81
82fn connect_edge_transaction_from_request(
83 plan: ConnectPlan,
84 request: ConnectEdgeRequest,
85) -> Option<GraphTransaction> {
86 if !plan.is_accept() || plan.ops().is_empty() {
87 return None;
88 }
89
90 Some(
91 GraphTransaction::from_ops(connect_edge_ops(plan.into_ops(), request.edge))
92 .with_label(CONNECT_EDGE_TRANSACTION_LABEL),
93 )
94}
95
96fn connect_edge_ops(
97 ops: impl IntoIterator<Item = GraphOp>,
98 edge: Option<EdgeId>,
99) -> impl Iterator<Item = GraphOp> {
100 ops.into_iter().map(move |op| match (edge, op) {
101 (Some(id), GraphOp::AddEdge { edge, .. }) => GraphOp::AddEdge { id, edge },
102 (_, op) => op,
103 })
104}
105
106impl NodeGraphStore {
107 pub fn plan_connect_edge(&self, request: ConnectEdgeRequest) -> ConnectPlan {
109 let interaction = self.resolved_interaction_state();
110 plan_connect_with_mode_and_policy(
111 self.graph(),
112 request.from,
113 request.to,
114 request.mode,
115 &interaction,
116 )
117 }
118
119 pub fn apply_connect_edge(
121 &mut self,
122 request: ConnectEdgeRequest,
123 ) -> Result<Option<DispatchOutcome>, ConnectEdgeError> {
124 let plan = self.plan_connect_edge(request);
125 if plan.is_reject() {
126 return Err(ConnectEdgeError::Rejected {
127 diagnostics: plan.diagnostics,
128 });
129 }
130
131 let Some(transaction) = connect_edge_transaction_from_request(plan, request) else {
132 return Ok(None);
133 };
134
135 self.dispatch_transaction(&transaction)
136 .map(Some)
137 .map_err(ConnectEdgeError::from)
138 }
139}