use crate::common::reusable::ReusableOr;
use crate::v1_1::criterion::Criterion;
use crate::v1_1::failure_action::FailureAction;
use crate::v1_1::parameter::Parameter;
use crate::v1_1::request_body::RequestBody;
use crate::v1_1::selector::ValueOrSelector;
use crate::v1_1::success_action::SuccessAction;
use crate::validation::{Context, ValidateWithContext};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum StepAction {
#[default]
Send,
Receive,
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Default)]
pub struct Step {
#[serde(rename = "stepId")]
pub step_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<i64>,
#[serde(rename = "dependsOn", default, skip_serializing_if = "Vec::is_empty")]
pub depends_on: Vec<String>,
#[serde(rename = "operationId", skip_serializing_if = "Option::is_none")]
pub operation_id: Option<String>,
#[serde(rename = "operationPath", skip_serializing_if = "Option::is_none")]
pub operation_path: Option<String>,
#[serde(rename = "channelPath", skip_serializing_if = "Option::is_none")]
pub channel_path: Option<String>,
#[serde(rename = "correlationId", skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub action: Option<StepAction>,
#[serde(rename = "workflowId", skip_serializing_if = "Option::is_none")]
pub workflow_id: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub parameters: Vec<ReusableOr<Parameter>>,
#[serde(rename = "requestBody", skip_serializing_if = "Option::is_none")]
pub request_body: Option<RequestBody>,
#[serde(
rename = "successCriteria",
default,
skip_serializing_if = "Vec::is_empty"
)]
pub success_criteria: Vec<Criterion>,
#[serde(rename = "onSuccess", default, skip_serializing_if = "Vec::is_empty")]
pub on_success: Vec<ReusableOr<SuccessAction>>,
#[serde(rename = "onFailure", default, skip_serializing_if = "Vec::is_empty")]
pub on_failure: Vec<ReusableOr<FailureAction>>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub outputs: BTreeMap<String, ValueOrSelector>,
#[serde(flatten)]
#[serde(with = "crate::common::extensions")]
#[serde(skip_serializing_if = "Option::is_none")]
pub extensions: Option<BTreeMap<String, serde_json::Value>>,
}
impl ValidateWithContext for Step {
fn validate_with_context(&self, ctx: &mut Context) {
ctx.require_non_empty("stepId", &self.step_id);
let has_op_id = self.operation_id.is_some();
let has_op_path = self.operation_path.is_some();
let has_channel = self.channel_path.is_some();
let has_workflow = self.workflow_id.is_some();
let is_async = has_channel || self.action.is_some() || self.correlation_id.is_some();
let is_operation = if has_workflow {
if has_op_id || has_op_path || has_channel || self.action.is_some() {
ctx.error(
"a workflow step (`workflowId`) must not set `operationId`, `operationPath`, `channelPath`, or `action`",
);
}
false
} else if is_async {
if self.action.is_none() {
ctx.error_field("action", "is required for an AsyncAPI step");
}
match (has_op_id, has_channel) {
(true, true) => ctx.error("`operationId` and `channelPath` are mutually exclusive"),
(false, false) => {
ctx.error("an AsyncAPI step must set `operationId` or `channelPath`")
}
_ => {}
}
if has_op_path {
ctx.error("`operationPath` is not valid on an AsyncAPI step");
}
if self.correlation_id.is_some() && self.action != Some(StepAction::Receive) {
ctx.error_field("correlationId", "is only valid when `action` is `receive`");
}
true
} else {
match (has_op_id, has_op_path) {
(true, true) => {
ctx.error("`operationId` and `operationPath` are mutually exclusive")
}
(false, false) => ctx.error(
"must set exactly one of `operationId`, `operationPath`, `channelPath`, or `workflowId`",
),
_ => {}
}
true
};
for (i, parameter) in self.parameters.iter().enumerate() {
ctx.in_index("parameters", i, |ctx| {
parameter.validate_with_context(ctx);
if is_operation
&& let ReusableOr::Item(p) = parameter
&& p.in_.is_none()
{
ctx.error_field("in", "is required for operation steps");
}
});
}
if let Some(request_body) = &self.request_body {
ctx.in_field("requestBody", |ctx| request_body.validate_with_context(ctx));
}
for (i, criterion) in self.success_criteria.iter().enumerate() {
ctx.in_index("successCriteria", i, |ctx| {
criterion.validate_with_context(ctx)
});
}
for (i, action) in self.on_success.iter().enumerate() {
ctx.in_index("onSuccess", i, |ctx| action.validate_with_context(ctx));
}
for (i, action) in self.on_failure.iter().enumerate() {
ctx.in_index("onFailure", i, |ctx| action.validate_with_context(ctx));
}
ctx.validate_map_keys("outputs", &self.outputs);
for (name, output) in &self.outputs {
ctx.in_key("outputs", name, |ctx| output.validate_with_context(ctx));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use enumset::EnumSet;
use serde_json::json;
fn validate(step: &Step) -> Vec<String> {
let mut ctx = Context::with_path(EnumSet::empty(), "#.steps[0]");
step.validate_with_context(&mut ctx);
ctx.errors.iter().map(ToString::to_string).collect()
}
#[test]
fn openapi_step_round_trips() {
let step: Step = serde_json::from_value(json!({
"stepId": "findPet",
"operationId": "getPetById",
"timeout": 5000,
"parameters": [ { "name": "petId", "in": "path", "value": "$inputs.petId" } ],
"successCriteria": [ { "condition": "$statusCode == 200" } ],
}))
.unwrap();
assert_eq!(step.timeout, Some(5000));
assert!(validate(&step).is_empty());
}
#[test]
fn asyncapi_step_round_trips() {
let step: Step = serde_json::from_value(json!({
"stepId": "listen",
"channelPath": "$sourceDescriptions.events#/channels/pets",
"action": "receive",
"correlationId": "$message.headers.id",
}))
.unwrap();
assert_eq!(step.action, Some(StepAction::Receive));
assert!(validate(&step).is_empty());
}
#[test]
fn asyncapi_step_requires_action() {
let step = Step {
step_id: "s".into(),
channel_path: Some("$src#/c".into()),
..Default::default()
};
assert!(
validate(&step)
.iter()
.any(|e| e == "#.steps[0].action: is required for an AsyncAPI step")
);
}
#[test]
fn correlation_id_requires_receive() {
let step = Step {
step_id: "s".into(),
channel_path: Some("$src#/c".into()),
action: Some(StepAction::Send),
correlation_id: Some(json!("x")),
..Default::default()
};
assert!(
validate(&step)
.iter()
.any(|e| e == "#.steps[0].correlationId: is only valid when `action` is `receive`")
);
}
#[test]
fn workflow_step_rejects_operation_fields() {
let step = Step {
step_id: "s".into(),
workflow_id: Some("wf".into()),
operation_id: Some("op".into()),
..Default::default()
};
assert!(validate(&step).iter().any(|e| e.contains("must not set")));
}
#[test]
fn step_with_no_target_is_rejected() {
let step = Step {
step_id: "s".into(),
..Default::default()
};
assert!(validate(&step).iter().any(|e| e.contains("exactly one of")));
}
#[test]
fn openapi_operation_parameter_requires_in() {
let step = Step {
step_id: "s".into(),
operation_id: Some("op".into()),
parameters: vec![ReusableOr::Item(Parameter {
name: "p".into(),
value: ValueOrSelector::literal("v"),
..Default::default()
})],
..Default::default()
};
assert!(
validate(&step)
.iter()
.any(|e| e == "#.steps[0].parameters[0].in: is required for operation steps")
);
}
#[test]
fn async_operation_id_and_channel_path_are_mutually_exclusive() {
let step = Step {
step_id: "s".into(),
operation_id: Some("op".into()),
channel_path: Some("$src#/c".into()),
action: Some(StepAction::Send),
..Default::default()
};
assert!(
validate(&step)
.iter()
.any(|e| e == "#.steps[0]: `operationId` and `channelPath` are mutually exclusive")
);
}
#[test]
fn async_step_with_action_but_no_target_is_rejected() {
let step = Step {
step_id: "s".into(),
action: Some(StepAction::Send),
..Default::default()
};
assert!(
validate(&step).iter().any(
|e| e == "#.steps[0]: an AsyncAPI step must set `operationId` or `channelPath`"
)
);
}
#[test]
fn operation_path_on_async_step_is_rejected() {
let step = Step {
step_id: "s".into(),
channel_path: Some("$src#/c".into()),
operation_path: Some("$src#/op".into()),
action: Some(StepAction::Send),
..Default::default()
};
assert!(
validate(&step)
.iter()
.any(|e| e == "#.steps[0]: `operationPath` is not valid on an AsyncAPI step")
);
}
#[test]
fn openapi_operation_id_and_path_are_mutually_exclusive() {
let step = Step {
step_id: "s".into(),
operation_id: Some("op".into()),
operation_path: Some("$src#/op".into()),
..Default::default()
};
assert!(
validate(&step).iter().any(
|e| e == "#.steps[0]: `operationId` and `operationPath` are mutually exclusive"
)
);
}
#[test]
fn on_failure_actions_are_recursed() {
let step = Step {
step_id: "s".into(),
workflow_id: Some("wf".into()),
on_failure: vec![ReusableOr::Reusable(
crate::common::reusable::Reusable::default(),
)],
..Default::default()
};
assert!(
validate(&step)
.iter()
.any(|e| e == "#.steps[0].onFailure[0].reference: must not be empty")
);
}
#[test]
fn output_selector_is_validated() {
let step: Step = serde_json::from_value(json!({
"stepId": "s",
"workflowId": "wf",
"outputs": { "id": { "context": "$response.body", "selector": "", "type": "jsonpath" } },
}))
.unwrap();
assert!(
validate(&step)
.iter()
.any(|e| e == "#.steps[0].outputs.id.selector: must not be empty")
);
}
}