use std::borrow::Cow;
use serde::{Deserialize, Serialize};
use crate::TaskErrorCode;
use crate::workflow::ValueRef;
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
pub struct FlowError {
pub code: TaskErrorCode,
pub message: Cow<'static, str>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data: Option<ValueRef>,
}
impl std::fmt::Display for FlowError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "error({}): {}", self.code, self.message)
}
}
impl FlowError {
pub fn new(code: TaskErrorCode, message: impl Into<Cow<'static, str>>) -> Self {
Self {
code,
message: message.into(),
data: None,
}
}
pub fn with_data<D: serde::Serialize>(self, data: D) -> Result<Self, serde_json::Error> {
let data = serde_json::to_value(data)?.into();
Ok(Self {
data: Some(data),
..self
})
}
pub fn from_error_stack<T: error_stack::Context>(report: error_stack::Report<T>) -> Self {
use crate::error_stack::ErrorStack;
let message = report.current_context().to_string();
let error_stack = ErrorStack::from_error_stack(report);
let data = match serde_json::to_value(&error_stack) {
Ok(value) => Some(ValueRef::new(value)),
Err(_) => None,
};
Self {
code: TaskErrorCode::OrchestratorError,
message: message.into(),
data,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum FlowResult {
Success(ValueRef),
Failed(FlowError),
}
struct FlowResultSuccess;
impl schemars::JsonSchema for FlowResultSuccess {
fn schema_name() -> std::borrow::Cow<'static, str> {
"FlowResultSuccess".into()
}
fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
let value_ref = generator.subschema_for::<ValueRef>();
schemars::json_schema!({
"description": "The step execution was successful.",
"type": "object",
"properties": {
"outcome": { "type": "string", "const": "success", "default": "success" },
"result": value_ref
},
"required": ["outcome", "result"]
})
}
}
struct FlowResultFailed;
impl schemars::JsonSchema for FlowResultFailed {
fn schema_name() -> std::borrow::Cow<'static, str> {
"FlowResultFailed".into()
}
fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
let flow_error_ref = generator.subschema_for::<FlowError>();
schemars::json_schema!({
"description": "The step failed with the given error.",
"type": "object",
"properties": {
"outcome": { "type": "string", "const": "failed", "default": "failed" },
"error": flow_error_ref
},
"required": ["outcome", "error"]
})
}
}
impl schemars::JsonSchema for FlowResult {
fn schema_name() -> std::borrow::Cow<'static, str> {
"FlowResult".into()
}
fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
let success_ref = generator.subschema_for::<FlowResultSuccess>();
let failed_ref = generator.subschema_for::<FlowResultFailed>();
schemars::json_schema!({
"oneOf": [success_ref, failed_ref],
"discriminator": {
"propertyName": "outcome",
"mapping": {
"success": "#/$defs/FlowResultSuccess",
"failed": "#/$defs/FlowResultFailed"
}
}
})
}
}
impl Serialize for FlowResult {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct as _;
match self {
FlowResult::Success(result) => {
let mut state = serializer.serialize_struct("FlowResult", 2)?;
state.serialize_field("outcome", "success")?;
state.serialize_field("result", result)?;
state.end()
}
FlowResult::Failed(error) => {
let mut state = serializer.serialize_struct("FlowResult", 2)?;
state.serialize_field("outcome", "failed")?;
state.serialize_field("error", error)?;
state.end()
}
}
}
}
impl<'de> Deserialize<'de> for FlowResult {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error as _;
let value = serde_json::Value::deserialize(deserializer)?;
let outcome = value
.get("outcome")
.and_then(|v| v.as_str())
.ok_or_else(|| D::Error::missing_field("outcome"))?;
match outcome {
"success" => {
let result = value
.get("result")
.ok_or_else(|| D::Error::missing_field("result"))?;
let result_ref = ValueRef::new(result.clone());
Ok(FlowResult::Success(result_ref))
}
"failed" => {
let error = FlowError::deserialize(
value
.get("error")
.ok_or_else(|| D::Error::missing_field("error"))?,
)
.map_err(D::Error::custom)?;
Ok(FlowResult::Failed(error))
}
_ => Err(D::Error::unknown_variant(outcome, &["success", "failed"])),
}
}
}
impl From<serde_json::Value> for FlowResult {
fn from(value: serde_json::Value) -> Self {
let result = ValueRef::new(value);
Self::Success(result)
}
}
impl FlowResult {
pub fn success(&self) -> Option<ValueRef> {
match self {
Self::Success(result) => Some(result.clone()),
_ => None,
}
}
pub fn failed(&self) -> Option<&FlowError> {
match self {
Self::Failed(error) => Some(error),
_ => None,
}
}
pub fn is_transport_error(&self) -> bool {
matches!(
self,
Self::Failed(e) if matches!(e.code, TaskErrorCode::Unreachable | TaskErrorCode::Timeout)
)
}
pub fn is_component_execution_error(&self) -> bool {
matches!(
self,
Self::Failed(e) if matches!(e.code, TaskErrorCode::ComponentFailed | TaskErrorCode::ResourceUnavailable)
)
}
#[cfg(test)]
pub fn unwrap_success(self) -> ValueRef {
match self {
Self::Success(result) => result,
Self::Failed(error) => {
panic!("Expected Success, got Failed: {}", error)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_transport_error() {
assert!(
FlowResult::Failed(FlowError::new(TaskErrorCode::Unreachable, "test"))
.is_transport_error()
);
assert!(
FlowResult::Failed(FlowError::new(TaskErrorCode::Timeout, "test")).is_transport_error()
);
assert!(
!FlowResult::Failed(FlowError::new(TaskErrorCode::ComponentFailed, "test"))
.is_transport_error()
);
}
#[test]
fn test_is_component_execution_error() {
assert!(
FlowResult::Failed(FlowError::new(TaskErrorCode::ComponentFailed, "test"))
.is_component_execution_error()
);
assert!(
FlowResult::Failed(FlowError::new(TaskErrorCode::ResourceUnavailable, "test"))
.is_component_execution_error()
);
assert!(
!FlowResult::Failed(FlowError::new(TaskErrorCode::Unreachable, "test"))
.is_component_execution_error()
);
}
}