stepflow_flow/
flow_result.rs1use std::borrow::Cow;
14
15use serde::{Deserialize, Serialize};
16
17use crate::TaskErrorCode;
18use crate::workflow::ValueRef;
19
20#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
22pub struct FlowError {
23 pub code: TaskErrorCode,
24 pub message: Cow<'static, str>,
25 #[serde(default, skip_serializing_if = "Option::is_none")]
26 pub data: Option<ValueRef>,
27}
28
29impl std::fmt::Display for FlowError {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 write!(f, "error({}): {}", self.code, self.message)
32 }
33}
34
35impl FlowError {
36 pub fn new(code: TaskErrorCode, message: impl Into<Cow<'static, str>>) -> Self {
37 Self {
38 code,
39 message: message.into(),
40 data: None,
41 }
42 }
43
44 pub fn with_data<D: serde::Serialize>(self, data: D) -> Result<Self, serde_json::Error> {
45 let data = serde_json::to_value(data)?.into();
46 Ok(Self {
47 data: Some(data),
48 ..self
49 })
50 }
51
52 pub fn from_error_stack<T: error_stack::Context>(report: error_stack::Report<T>) -> Self {
54 use crate::error_stack::ErrorStack;
55
56 let message = report.current_context().to_string();
57 let error_stack = ErrorStack::from_error_stack(report);
58 let data = match serde_json::to_value(&error_stack) {
59 Ok(value) => Some(ValueRef::new(value)),
60 Err(_) => None,
61 };
62
63 Self {
64 code: TaskErrorCode::OrchestratorError,
65 message: message.into(),
66 data,
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq)]
73pub enum FlowResult {
74 Success(ValueRef),
76 Failed(FlowError),
78}
79
80struct FlowResultSuccess;
85
86impl schemars::JsonSchema for FlowResultSuccess {
87 fn schema_name() -> std::borrow::Cow<'static, str> {
88 "FlowResultSuccess".into()
89 }
90
91 fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
92 let value_ref = generator.subschema_for::<ValueRef>();
93 schemars::json_schema!({
94 "description": "The step execution was successful.",
95 "type": "object",
96 "properties": {
97 "outcome": { "type": "string", "const": "success", "default": "success" },
98 "result": value_ref
99 },
100 "required": ["outcome", "result"]
101 })
102 }
103}
104
105struct FlowResultFailed;
107
108impl schemars::JsonSchema for FlowResultFailed {
109 fn schema_name() -> std::borrow::Cow<'static, str> {
110 "FlowResultFailed".into()
111 }
112
113 fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
114 let flow_error_ref = generator.subschema_for::<FlowError>();
115 schemars::json_schema!({
116 "description": "The step failed with the given error.",
117 "type": "object",
118 "properties": {
119 "outcome": { "type": "string", "const": "failed", "default": "failed" },
120 "error": flow_error_ref
121 },
122 "required": ["outcome", "error"]
123 })
124 }
125}
126
127impl schemars::JsonSchema for FlowResult {
128 fn schema_name() -> std::borrow::Cow<'static, str> {
129 "FlowResult".into()
130 }
131
132 fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
133 let success_ref = generator.subschema_for::<FlowResultSuccess>();
134 let failed_ref = generator.subschema_for::<FlowResultFailed>();
135
136 schemars::json_schema!({
137 "oneOf": [success_ref, failed_ref],
138 "discriminator": {
139 "propertyName": "outcome",
140 "mapping": {
141 "success": "#/$defs/FlowResultSuccess",
142 "failed": "#/$defs/FlowResultFailed"
143 }
144 }
145 })
146 }
147}
148
149impl Serialize for FlowResult {
150 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
151 where
152 S: serde::Serializer,
153 {
154 use serde::ser::SerializeStruct as _;
155
156 match self {
157 FlowResult::Success(result) => {
158 let mut state = serializer.serialize_struct("FlowResult", 2)?;
159 state.serialize_field("outcome", "success")?;
160 state.serialize_field("result", result)?;
161 state.end()
162 }
163 FlowResult::Failed(error) => {
164 let mut state = serializer.serialize_struct("FlowResult", 2)?;
165 state.serialize_field("outcome", "failed")?;
166 state.serialize_field("error", error)?;
167 state.end()
168 }
169 }
170 }
171}
172
173impl<'de> Deserialize<'de> for FlowResult {
174 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
175 where
176 D: serde::Deserializer<'de>,
177 {
178 use serde::de::Error as _;
179
180 let value = serde_json::Value::deserialize(deserializer)?;
181 let outcome = value
182 .get("outcome")
183 .and_then(|v| v.as_str())
184 .ok_or_else(|| D::Error::missing_field("outcome"))?;
185
186 match outcome {
187 "success" => {
188 let result = value
189 .get("result")
190 .ok_or_else(|| D::Error::missing_field("result"))?;
191 let result_ref = ValueRef::new(result.clone());
192 Ok(FlowResult::Success(result_ref))
193 }
194 "failed" => {
195 let error = FlowError::deserialize(
196 value
197 .get("error")
198 .ok_or_else(|| D::Error::missing_field("error"))?,
199 )
200 .map_err(D::Error::custom)?;
201 Ok(FlowResult::Failed(error))
202 }
203 _ => Err(D::Error::unknown_variant(outcome, &["success", "failed"])),
204 }
205 }
206}
207
208impl From<serde_json::Value> for FlowResult {
209 fn from(value: serde_json::Value) -> Self {
210 let result = ValueRef::new(value);
211 Self::Success(result)
212 }
213}
214
215impl FlowResult {
216 pub fn success(&self) -> Option<ValueRef> {
217 match self {
218 Self::Success(result) => Some(result.clone()),
219 _ => None,
220 }
221 }
222
223 pub fn failed(&self) -> Option<&FlowError> {
224 match self {
225 Self::Failed(error) => Some(error),
226 _ => None,
227 }
228 }
229
230 pub fn is_transport_error(&self) -> bool {
232 matches!(
233 self,
234 Self::Failed(e) if matches!(e.code, TaskErrorCode::Unreachable | TaskErrorCode::Timeout)
235 )
236 }
237
238 pub fn is_component_execution_error(&self) -> bool {
241 matches!(
242 self,
243 Self::Failed(e) if matches!(e.code, TaskErrorCode::ComponentFailed | TaskErrorCode::ResourceUnavailable)
244 )
245 }
246
247 #[cfg(test)]
251 pub fn unwrap_success(self) -> ValueRef {
252 match self {
253 Self::Success(result) => result,
254 Self::Failed(error) => {
255 panic!("Expected Success, got Failed: {}", error)
256 }
257 }
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn test_is_transport_error() {
267 assert!(
268 FlowResult::Failed(FlowError::new(TaskErrorCode::Unreachable, "test"))
269 .is_transport_error()
270 );
271 assert!(
272 FlowResult::Failed(FlowError::new(TaskErrorCode::Timeout, "test")).is_transport_error()
273 );
274 assert!(
275 !FlowResult::Failed(FlowError::new(TaskErrorCode::ComponentFailed, "test"))
276 .is_transport_error()
277 );
278 }
279
280 #[test]
281 fn test_is_component_execution_error() {
282 assert!(
283 FlowResult::Failed(FlowError::new(TaskErrorCode::ComponentFailed, "test"))
284 .is_component_execution_error()
285 );
286 assert!(
287 FlowResult::Failed(FlowError::new(TaskErrorCode::ResourceUnavailable, "test"))
288 .is_component_execution_error()
289 );
290 assert!(
291 !FlowResult::Failed(FlowError::new(TaskErrorCode::Unreachable, "test"))
292 .is_component_execution_error()
293 );
294 }
295}