Skip to main content

stepflow_flow/
flow_result.rs

1// Copyright 2025 DataStax Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4// in compliance with the License. You may obtain a copy of the License at
5//
6//     http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software distributed under the License
9// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10// or implied. See the License for the specific language governing permissions and limitations under
11// the License.
12
13use std::borrow::Cow;
14
15use serde::{Deserialize, Serialize};
16
17use crate::TaskErrorCode;
18use crate::workflow::ValueRef;
19
20/// An error reported from within a flow or step.
21#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
22pub struct FlowError {
23    pub code: TaskErrorCode,
24    pub message: Cow<'static, str>,
25    #[serde(default, skip_serializing_if = "Option::is_none")]
26    pub data: Option<ValueRef>,
27}
28
29impl std::fmt::Display for FlowError {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        write!(f, "error({}): {}", self.code, self.message)
32    }
33}
34
35impl FlowError {
36    pub fn new(code: TaskErrorCode, message: impl Into<Cow<'static, str>>) -> Self {
37        Self {
38            code,
39            message: message.into(),
40            data: None,
41        }
42    }
43
44    pub fn with_data<D: serde::Serialize>(self, data: D) -> Result<Self, serde_json::Error> {
45        let data = serde_json::to_value(data)?.into();
46        Ok(Self {
47            data: Some(data),
48            ..self
49        })
50    }
51
52    /// Create a FlowError from an error_stack::Report, preserving the full stack trace.
53    pub fn from_error_stack<T: error_stack::Context>(report: error_stack::Report<T>) -> Self {
54        use crate::error_stack::ErrorStack;
55
56        let message = report.current_context().to_string();
57        let error_stack = ErrorStack::from_error_stack(report);
58        let data = match serde_json::to_value(&error_stack) {
59            Ok(value) => Some(ValueRef::new(value)),
60            Err(_) => None,
61        };
62
63        Self {
64            code: TaskErrorCode::OrchestratorError,
65            message: message.into(),
66            data,
67        }
68    }
69}
70
71/// The results of a step execution.
72#[derive(Debug, Clone, PartialEq)]
73pub enum FlowResult {
74    /// The step execution was successful.
75    Success(ValueRef),
76    /// The step failed with the given error.
77    Failed(FlowError),
78}
79
80/// Schema for the success variant of [`FlowResult`].
81///
82/// This is a standalone type so that code generators (e.g. datamodel-code-generator)
83/// emit it as a named class in `$defs` rather than an anonymous inline schema.
84struct FlowResultSuccess;
85
86impl schemars::JsonSchema for FlowResultSuccess {
87    fn schema_name() -> std::borrow::Cow<'static, str> {
88        "FlowResultSuccess".into()
89    }
90
91    fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
92        let value_ref = generator.subschema_for::<ValueRef>();
93        schemars::json_schema!({
94            "description": "The step execution was successful.",
95            "type": "object",
96            "properties": {
97                "outcome": { "type": "string", "const": "success", "default": "success" },
98                "result": value_ref
99            },
100            "required": ["outcome", "result"]
101        })
102    }
103}
104
105/// Schema for the failed variant of [`FlowResult`].
106struct FlowResultFailed;
107
108impl schemars::JsonSchema for FlowResultFailed {
109    fn schema_name() -> std::borrow::Cow<'static, str> {
110        "FlowResultFailed".into()
111    }
112
113    fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
114        let flow_error_ref = generator.subschema_for::<FlowError>();
115        schemars::json_schema!({
116            "description": "The step failed with the given error.",
117            "type": "object",
118            "properties": {
119                "outcome": { "type": "string", "const": "failed", "default": "failed" },
120                "error": flow_error_ref
121            },
122            "required": ["outcome", "error"]
123        })
124    }
125}
126
127impl schemars::JsonSchema for FlowResult {
128    fn schema_name() -> std::borrow::Cow<'static, str> {
129        "FlowResult".into()
130    }
131
132    fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
133        let success_ref = generator.subschema_for::<FlowResultSuccess>();
134        let failed_ref = generator.subschema_for::<FlowResultFailed>();
135
136        schemars::json_schema!({
137            "oneOf": [success_ref, failed_ref],
138            "discriminator": {
139                "propertyName": "outcome",
140                "mapping": {
141                    "success": "#/$defs/FlowResultSuccess",
142                    "failed": "#/$defs/FlowResultFailed"
143                }
144            }
145        })
146    }
147}
148
149impl Serialize for FlowResult {
150    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
151    where
152        S: serde::Serializer,
153    {
154        use serde::ser::SerializeStruct as _;
155
156        match self {
157            FlowResult::Success(result) => {
158                let mut state = serializer.serialize_struct("FlowResult", 2)?;
159                state.serialize_field("outcome", "success")?;
160                state.serialize_field("result", result)?;
161                state.end()
162            }
163            FlowResult::Failed(error) => {
164                let mut state = serializer.serialize_struct("FlowResult", 2)?;
165                state.serialize_field("outcome", "failed")?;
166                state.serialize_field("error", error)?;
167                state.end()
168            }
169        }
170    }
171}
172
173impl<'de> Deserialize<'de> for FlowResult {
174    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
175    where
176        D: serde::Deserializer<'de>,
177    {
178        use serde::de::Error as _;
179
180        let value = serde_json::Value::deserialize(deserializer)?;
181        let outcome = value
182            .get("outcome")
183            .and_then(|v| v.as_str())
184            .ok_or_else(|| D::Error::missing_field("outcome"))?;
185
186        match outcome {
187            "success" => {
188                let result = value
189                    .get("result")
190                    .ok_or_else(|| D::Error::missing_field("result"))?;
191                let result_ref = ValueRef::new(result.clone());
192                Ok(FlowResult::Success(result_ref))
193            }
194            "failed" => {
195                let error = FlowError::deserialize(
196                    value
197                        .get("error")
198                        .ok_or_else(|| D::Error::missing_field("error"))?,
199                )
200                .map_err(D::Error::custom)?;
201                Ok(FlowResult::Failed(error))
202            }
203            _ => Err(D::Error::unknown_variant(outcome, &["success", "failed"])),
204        }
205    }
206}
207
208impl From<serde_json::Value> for FlowResult {
209    fn from(value: serde_json::Value) -> Self {
210        let result = ValueRef::new(value);
211        Self::Success(result)
212    }
213}
214
215impl FlowResult {
216    pub fn success(&self) -> Option<ValueRef> {
217        match self {
218            Self::Success(result) => Some(result.clone()),
219            _ => None,
220        }
221    }
222
223    pub fn failed(&self) -> Option<&FlowError> {
224        match self {
225            Self::Failed(error) => Some(error),
226            _ => None,
227        }
228    }
229
230    /// Returns true if this is a transport/infrastructure error (always retried).
231    pub fn is_transport_error(&self) -> bool {
232        matches!(
233            self,
234            Self::Failed(e) if matches!(e.code, TaskErrorCode::Unreachable | TaskErrorCode::Timeout)
235        )
236    }
237
238    /// Returns true if this is a component execution error
239    /// (retryable with `onError: { action: retry }`).
240    pub fn is_component_execution_error(&self) -> bool {
241        matches!(
242            self,
243            Self::Failed(e) if matches!(e.code, TaskErrorCode::ComponentFailed | TaskErrorCode::ResourceUnavailable)
244        )
245    }
246
247    /// Unwrap a successful result, panicking if the result is not Success.
248    ///
249    /// This is primarily useful for testing where we expect a successful result.
250    #[cfg(test)]
251    pub fn unwrap_success(self) -> ValueRef {
252        match self {
253            Self::Success(result) => result,
254            Self::Failed(error) => {
255                panic!("Expected Success, got Failed: {}", error)
256            }
257        }
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn test_is_transport_error() {
267        assert!(
268            FlowResult::Failed(FlowError::new(TaskErrorCode::Unreachable, "test"))
269                .is_transport_error()
270        );
271        assert!(
272            FlowResult::Failed(FlowError::new(TaskErrorCode::Timeout, "test")).is_transport_error()
273        );
274        assert!(
275            !FlowResult::Failed(FlowError::new(TaskErrorCode::ComponentFailed, "test"))
276                .is_transport_error()
277        );
278    }
279
280    #[test]
281    fn test_is_component_execution_error() {
282        assert!(
283            FlowResult::Failed(FlowError::new(TaskErrorCode::ComponentFailed, "test"))
284                .is_component_execution_error()
285        );
286        assert!(
287            FlowResult::Failed(FlowError::new(TaskErrorCode::ResourceUnavailable, "test"))
288                .is_component_execution_error()
289        );
290        assert!(
291            !FlowResult::Failed(FlowError::new(TaskErrorCode::Unreachable, "test"))
292                .is_component_execution_error()
293        );
294    }
295}