jellyflow_runtime/runtime/connection/
reconnect.rs1use serde::{Deserialize, Serialize};
2
3use crate::rules::{
4 ConnectPlan, Diagnostic, EdgeEndpoint, plan_reconnect_edge_with_mode_and_policy,
5};
6use crate::runtime::store::{DispatchError, DispatchOutcome, NodeGraphStore};
7use jellyflow_core::core::{EdgeId, PortId};
8use jellyflow_core::interaction::NodeGraphConnectionMode;
9use jellyflow_core::ops::GraphTransaction;
10
11pub const RECONNECT_EDGE_TRANSACTION_LABEL: &str = "reconnect edge";
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct ReconnectEdgeRequest {
17 pub edge: EdgeId,
18 pub endpoint: EdgeEndpoint,
19 pub new_port: PortId,
20 #[serde(default)]
21 pub mode: NodeGraphConnectionMode,
22}
23
24impl ReconnectEdgeRequest {
25 pub fn new(
26 edge: EdgeId,
27 endpoint: EdgeEndpoint,
28 new_port: PortId,
29 mode: NodeGraphConnectionMode,
30 ) -> Self {
31 Self {
32 edge,
33 endpoint,
34 new_port,
35 mode,
36 }
37 }
38}
39
40#[derive(Debug, thiserror::Error)]
42pub enum ReconnectEdgeError {
43 #[error("reconnect edge was rejected")]
44 Rejected { diagnostics: Vec<Diagnostic> },
45 #[error(transparent)]
46 Dispatch(#[from] DispatchError),
47}
48
49impl ReconnectEdgeError {
50 pub fn diagnostics(&self) -> Option<&[Diagnostic]> {
51 match self {
52 Self::Rejected { diagnostics } => Some(diagnostics),
53 Self::Dispatch(_) => None,
54 }
55 }
56}
57
58pub fn reconnect_edge_transaction(plan: &ConnectPlan) -> Option<GraphTransaction> {
59 if !plan.is_accept() || plan.ops().is_empty() {
60 return None;
61 }
62
63 Some(
64 GraphTransaction::from_ops(plan.ops().iter().cloned())
65 .with_label(RECONNECT_EDGE_TRANSACTION_LABEL),
66 )
67}
68
69fn reconnect_edge_transaction_from_plan(plan: ConnectPlan) -> Option<GraphTransaction> {
70 if !plan.is_accept() || plan.ops().is_empty() {
71 return None;
72 }
73
74 Some(GraphTransaction::from_ops(plan.into_ops()).with_label(RECONNECT_EDGE_TRANSACTION_LABEL))
75}
76
77impl NodeGraphStore {
78 pub fn plan_reconnect_edge(&self, request: ReconnectEdgeRequest) -> ConnectPlan {
80 let interaction = self.resolved_interaction_state();
81 plan_reconnect_edge_with_mode_and_policy(
82 self.graph(),
83 request.edge,
84 request.endpoint,
85 request.new_port,
86 request.mode,
87 &interaction,
88 )
89 }
90
91 pub fn apply_reconnect_edge(
93 &mut self,
94 request: ReconnectEdgeRequest,
95 ) -> Result<Option<DispatchOutcome>, ReconnectEdgeError> {
96 let plan = self.plan_reconnect_edge(request);
97 if plan.is_reject() {
98 return Err(ReconnectEdgeError::Rejected {
99 diagnostics: plan.diagnostics,
100 });
101 }
102
103 let Some(transaction) = reconnect_edge_transaction_from_plan(plan) else {
104 return Ok(None);
105 };
106
107 self.dispatch_transaction(&transaction)
108 .map(Some)
109 .map_err(ReconnectEdgeError::from)
110 }
111}