Skip to main content

jellyflow_runtime/runtime/connection/
reconnect.rs

1use serde::{Deserialize, Serialize};
2
3use crate::rules::{
4    ConnectPlan, Diagnostic, EdgeEndpoint, plan_reconnect_edge_with_mode_and_policy,
5};
6use crate::runtime::store::{DispatchError, DispatchOutcome, NodeGraphStore};
7use jellyflow_core::core::{EdgeId, PortId};
8use jellyflow_core::interaction::NodeGraphConnectionMode;
9use jellyflow_core::ops::GraphTransaction;
10
11/// Default transaction label used for committed reconnect updates.
12pub const RECONNECT_EDGE_TRANSACTION_LABEL: &str = "reconnect edge";
13
14/// Rules-driven request for reconnecting one endpoint of an existing edge.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct ReconnectEdgeRequest {
17    pub edge: EdgeId,
18    pub endpoint: EdgeEndpoint,
19    pub new_port: PortId,
20    #[serde(default)]
21    pub mode: NodeGraphConnectionMode,
22}
23
24impl ReconnectEdgeRequest {
25    pub fn new(
26        edge: EdgeId,
27        endpoint: EdgeEndpoint,
28        new_port: PortId,
29        mode: NodeGraphConnectionMode,
30    ) -> Self {
31        Self {
32            edge,
33            endpoint,
34            new_port,
35            mode,
36        }
37    }
38}
39
40/// Error returned when a reconnect request could not be committed.
41#[derive(Debug, thiserror::Error)]
42pub enum ReconnectEdgeError {
43    #[error("reconnect edge was rejected")]
44    Rejected { diagnostics: Vec<Diagnostic> },
45    #[error(transparent)]
46    Dispatch(#[from] DispatchError),
47}
48
49impl ReconnectEdgeError {
50    pub fn diagnostics(&self) -> Option<&[Diagnostic]> {
51        match self {
52            Self::Rejected { diagnostics } => Some(diagnostics),
53            Self::Dispatch(_) => None,
54        }
55    }
56}
57
58pub fn reconnect_edge_transaction(plan: &ConnectPlan) -> Option<GraphTransaction> {
59    if !plan.is_accept() || plan.ops().is_empty() {
60        return None;
61    }
62
63    Some(
64        GraphTransaction::from_ops(plan.ops().iter().cloned())
65            .with_label(RECONNECT_EDGE_TRANSACTION_LABEL),
66    )
67}
68
69fn reconnect_edge_transaction_from_plan(plan: ConnectPlan) -> Option<GraphTransaction> {
70    if !plan.is_accept() || plan.ops().is_empty() {
71        return None;
72    }
73
74    Some(GraphTransaction::from_ops(plan.into_ops()).with_label(RECONNECT_EDGE_TRANSACTION_LABEL))
75}
76
77impl NodeGraphStore {
78    /// Plans reconnecting one endpoint of an existing edge against the resolved interaction policy.
79    pub fn plan_reconnect_edge(&self, request: ReconnectEdgeRequest) -> ConnectPlan {
80        let interaction = self.resolved_interaction_state();
81        plan_reconnect_edge_with_mode_and_policy(
82            self.graph(),
83            request.edge,
84            request.endpoint,
85            request.new_port,
86            request.mode,
87            &interaction,
88        )
89    }
90
91    /// Commits a reconnect request through normal store dispatch.
92    pub fn apply_reconnect_edge(
93        &mut self,
94        request: ReconnectEdgeRequest,
95    ) -> Result<Option<DispatchOutcome>, ReconnectEdgeError> {
96        let plan = self.plan_reconnect_edge(request);
97        if plan.is_reject() {
98            return Err(ReconnectEdgeError::Rejected {
99                diagnostics: plan.diagnostics,
100            });
101        }
102
103        let Some(transaction) = reconnect_edge_transaction_from_plan(plan) else {
104            return Ok(None);
105        };
106
107        self.dispatch_transaction(&transaction)
108            .map(Some)
109            .map_err(ReconnectEdgeError::from)
110    }
111}