Skip to main content

jellyflow_runtime/runtime/connection/
connect.rs

1use serde::{Deserialize, Serialize};
2
3use crate::rules::{ConnectPlan, Diagnostic, plan_connect_with_mode_and_policy};
4use crate::runtime::store::{DispatchError, DispatchOutcome, NodeGraphStore};
5use jellyflow_core::core::{EdgeId, PortId};
6use jellyflow_core::interaction::NodeGraphConnectionMode;
7use jellyflow_core::ops::{GraphOp, GraphTransaction};
8
9/// Default transaction label used for committed connect updates.
10pub const CONNECT_EDGE_TRANSACTION_LABEL: &str = "connect edge";
11
12/// Rules-driven request for connecting two existing ports.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct ConnectEdgeRequest {
15    pub from: PortId,
16    pub to: PortId,
17    #[serde(default, skip_serializing_if = "Option::is_none")]
18    pub edge: Option<EdgeId>,
19    #[serde(default)]
20    pub mode: NodeGraphConnectionMode,
21}
22
23impl ConnectEdgeRequest {
24    pub fn new(from: PortId, to: PortId, mode: NodeGraphConnectionMode) -> Self {
25        Self {
26            from,
27            to,
28            edge: None,
29            mode,
30        }
31    }
32
33    pub fn with_edge_id(mut self, edge: EdgeId) -> Self {
34        self.edge = Some(edge);
35        self
36    }
37}
38
39/// Error returned when a connect request could not be committed.
40#[derive(Debug, thiserror::Error)]
41pub enum ConnectEdgeError {
42    #[error("connect edge was rejected")]
43    Rejected { diagnostics: Vec<Diagnostic> },
44    #[error(transparent)]
45    Dispatch(#[from] DispatchError),
46}
47
48impl ConnectEdgeError {
49    pub fn diagnostics(&self) -> Option<&[Diagnostic]> {
50        match self {
51            Self::Rejected { diagnostics } => Some(diagnostics),
52            Self::Dispatch(_) => None,
53        }
54    }
55}
56
57pub fn connect_edge_transaction(plan: &ConnectPlan) -> Option<GraphTransaction> {
58    connect_edge_transaction_with_optional_edge_id(plan, None)
59}
60
61pub fn connect_edge_transaction_with_edge_id(
62    plan: &ConnectPlan,
63    edge: EdgeId,
64) -> Option<GraphTransaction> {
65    connect_edge_transaction_with_optional_edge_id(plan, Some(edge))
66}
67
68fn connect_edge_transaction_with_optional_edge_id(
69    plan: &ConnectPlan,
70    edge: Option<EdgeId>,
71) -> Option<GraphTransaction> {
72    if !plan.is_accept() || plan.ops().is_empty() {
73        return None;
74    }
75
76    Some(
77        GraphTransaction::from_ops(connect_edge_ops(plan.ops().iter().cloned(), edge))
78            .with_label(CONNECT_EDGE_TRANSACTION_LABEL),
79    )
80}
81
82fn connect_edge_transaction_from_request(
83    plan: ConnectPlan,
84    request: ConnectEdgeRequest,
85) -> Option<GraphTransaction> {
86    if !plan.is_accept() || plan.ops().is_empty() {
87        return None;
88    }
89
90    Some(
91        GraphTransaction::from_ops(connect_edge_ops(plan.into_ops(), request.edge))
92            .with_label(CONNECT_EDGE_TRANSACTION_LABEL),
93    )
94}
95
96fn connect_edge_ops(
97    ops: impl IntoIterator<Item = GraphOp>,
98    edge: Option<EdgeId>,
99) -> impl Iterator<Item = GraphOp> {
100    ops.into_iter().map(move |op| match (edge, op) {
101        (Some(id), GraphOp::AddEdge { edge, .. }) => GraphOp::AddEdge { id, edge },
102        (_, op) => op,
103    })
104}
105
106impl NodeGraphStore {
107    /// Plans connecting two existing ports against the resolved interaction policy.
108    pub fn plan_connect_edge(&self, request: ConnectEdgeRequest) -> ConnectPlan {
109        let interaction = self.resolved_interaction_state();
110        plan_connect_with_mode_and_policy(
111            self.graph(),
112            request.from,
113            request.to,
114            request.mode,
115            &interaction,
116        )
117    }
118
119    /// Commits a connect request through normal store dispatch.
120    pub fn apply_connect_edge(
121        &mut self,
122        request: ConnectEdgeRequest,
123    ) -> Result<Option<DispatchOutcome>, ConnectEdgeError> {
124        let plan = self.plan_connect_edge(request);
125        if plan.is_reject() {
126            return Err(ConnectEdgeError::Rejected {
127                diagnostics: plan.diagnostics,
128            });
129        }
130
131        let Some(transaction) = connect_edge_transaction_from_request(plan, request) else {
132            return Ok(None);
133        };
134
135        self.dispatch_transaction(&transaction)
136            .map(Some)
137            .map_err(ConnectEdgeError::from)
138    }
139}