use std::{convert::TryFrom, fmt, time::Duration};
use aws_sdk_cloudformation::{
client::fluent_builders::CreateChangeSet,
error::DescribeChangeSetError,
model::{Change, ChangeAction},
output::DescribeChangeSetOutput,
types::SdkError,
};
use aws_smithy_types_convert::date_time::DateTimeExt;
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use futures_util::TryFutureExt;
use regex::Regex;
use tokio::time::{interval_at, Instant};
use crate::{
stack::{StackOperation, StackOperationStatus},
BlockedStackStatus, Capability, ChangeSetStatus, StackStatus, Tag,
};
const POLL_INTERVAL_CHANGE_SET: Duration = Duration::from_secs(1);
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum ChangeSetType {
Create,
Update,
}
impl ChangeSetType {
pub(crate) fn into_sdk(self) -> aws_sdk_cloudformation::model::ChangeSetType {
match self {
ChangeSetType::Create => aws_sdk_cloudformation::model::ChangeSetType::Create,
ChangeSetType::Update => aws_sdk_cloudformation::model::ChangeSetType::Update,
}
}
}
impl fmt::Display for ChangeSetType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Create => write!(f, "CREATE"),
Self::Update => write!(f, "UPDATE"),
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[allow(clippy::module_name_repetitions)]
pub struct ChangeSet {
pub capabilities: Vec<Capability>,
pub change_set_id: String,
pub change_set_name: String,
pub changes: Vec<ResourceChange>,
pub creation_time: DateTime<Utc>,
pub description: Option<String>,
pub execution_status: ExecutionStatus,
pub notification_arns: Vec<String>,
pub parameters: Vec<Parameter>,
pub stack_id: String,
pub stack_name: String,
pub status: ChangeSetStatus,
pub status_reason: Option<String>,
pub tags: Vec<Tag>,
}
impl ChangeSet {
fn from_sdk(change_set: DescribeChangeSetOutput) -> Self {
Self {
capabilities: change_set
.capabilities
.unwrap_or_default()
.into_iter()
.map(|capability| {
capability
.as_str()
.parse()
.expect("DescribeChangeSetOutput with invalid Capability")
})
.collect(),
change_set_id: change_set
.change_set_id
.expect("DescribeChangeSetOutput without change_set_id"),
change_set_name: change_set
.change_set_name
.expect("DescribeChangeSetOutput without change_set_name"),
changes: change_set
.changes
.unwrap_or_default()
.into_iter()
.map(ResourceChange::from_sdk)
.collect(),
creation_time: change_set
.creation_time
.expect("DescribeChangeSetOutput without creation_time")
.to_chrono_utc(),
description: change_set.description,
execution_status: change_set
.execution_status
.expect("DescribeChangeSetOutput without execution_status")
.as_str()
.parse()
.expect("DescribeChangeSetOutput with invalid execution_status"),
notification_arns: change_set.notification_ar_ns.unwrap_or_default(),
parameters: change_set
.parameters
.unwrap_or_default()
.into_iter()
.map(Parameter::from_sdk)
.collect(),
stack_id: change_set
.stack_id
.expect("DescribeChangeSetOutput without stack_id"),
stack_name: change_set
.stack_name
.expect("DescribeChangeSetOutput without stack_name"),
status: change_set
.status
.expect("DescribeChangeSetOutput without status")
.as_str()
.parse()
.expect("DescribeChangeSetOutput unexpected status"),
status_reason: change_set.status_reason,
tags: change_set
.tags
.unwrap_or_default()
.into_iter()
.map(Tag::from_sdk)
.collect(),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, parse_display::Display, parse_display::FromStr)]
#[display(style = "SNAKE_CASE")]
pub enum ExecutionStatus {
Unavailable,
Available,
ExecuteInProgress,
ExecuteComplete,
ExecuteFailed,
Obsolete,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Parameter {
pub parameter_key: String,
pub parameter_value: Option<String>,
pub use_previous_value: Option<bool>,
pub resolved_value: Option<String>,
}
impl Parameter {
fn from_sdk(param: aws_sdk_cloudformation::model::Parameter) -> Self {
Self {
parameter_key: param
.parameter_key
.expect("Parameter without parameter_key"),
parameter_value: param.parameter_value,
use_previous_value: param.use_previous_value,
resolved_value: param.resolved_value,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ResourceChange {
pub action: Action,
pub logical_resource_id: String,
pub physical_resource_id: Option<String>,
pub resource_type: String,
}
impl ResourceChange {
fn from_sdk(change: Change) -> Self {
assert!(
matches!(
change.r#type,
Some(aws_sdk_cloudformation::model::ChangeType::Resource)
),
"Change with unexpected type {:?}",
change.r#type
);
let change = change
.resource_change
.expect("Change without resource_change");
let resource_type = change
.resource_type
.expect("ResourceChange without resource_type");
Self {
action: Action::from_sdk(
&resource_type,
&change.action.expect("ResourceChange without action"),
change.details,
change.replacement,
change.scope,
),
logical_resource_id: change
.logical_resource_id
.expect("ResourceChange without logical_resource_id"),
physical_resource_id: change.physical_resource_id,
resource_type,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Action {
Add,
Modify(ModifyDetail),
Remove,
Import,
Dynamic,
}
impl Action {
fn from_sdk(
resource_type: &str,
action: &ChangeAction,
details: Option<Vec<aws_sdk_cloudformation::model::ResourceChangeDetail>>,
replacement: Option<aws_sdk_cloudformation::model::Replacement>,
scope: Option<Vec<aws_sdk_cloudformation::model::ResourceAttribute>>,
) -> Self {
match action {
ChangeAction::Add
| ChangeAction::Remove
| ChangeAction::Import
| ChangeAction::Dynamic => {
assert!(
matches!(details.as_deref(), None | Some([])),
"ResourceChange with action {:?} and details",
action
);
assert!(
replacement.is_none(),
"ResourceChange with action {:?} and replacement",
action
);
assert!(
scope.unwrap_or_default().is_empty(),
"ResourceChange with action {:?} and scope",
action
);
match action {
ChangeAction::Add => Self::Add,
ChangeAction::Remove => Self::Remove,
ChangeAction::Import => Self::Import,
ChangeAction::Dynamic => Self::Dynamic,
_ => unreachable!(),
}
}
ChangeAction::Modify => Self::Modify(ModifyDetail::from_sdk(
resource_type,
details.expect("ResourceChange with action \"Modify\" without details"),
&replacement.expect("ResourceChange with action \"Modify\" without replacement"),
scope.expect("ResourceChange with action \"Modify\" without scope"),
)),
_ => panic!("ResourceChange with invalid action {:?}", action),
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ModifyDetail {
pub details: Vec<ResourceChangeDetail>,
pub replacement: Replacement,
pub scope: EnumSet<ModifyScope>,
}
impl ModifyDetail {
fn from_sdk(
resource_type: &str,
details: Vec<aws_sdk_cloudformation::model::ResourceChangeDetail>,
replacement: &aws_sdk_cloudformation::model::Replacement,
scope: Vec<aws_sdk_cloudformation::model::ResourceAttribute>,
) -> Self {
Self {
details: details
.into_iter()
.map(|detail| ResourceChangeDetail::from_sdk(resource_type, detail))
.collect(),
replacement: replacement
.as_str()
.parse()
.expect("ResourceChange with invalid replacement"),
scope: scope
.into_iter()
.map(|scope| {
scope
.as_str()
.parse::<ModifyScope>()
.expect("ResourceChange with invalid scope")
})
.collect(),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, parse_display::Display, parse_display::FromStr)]
pub enum Replacement {
True,
False,
Conditional,
}
mod modify_scope {
#![allow(clippy::expl_impl_clone_on_copy)]
#[derive(Debug, enumset::EnumSetType, parse_display::Display, parse_display::FromStr)]
#[enumset(no_ops)]
pub enum ModifyScope {
Properties,
Metadata,
CreationPolicy,
UpdatePolicy,
DeletionPolicy,
Tags,
}
}
pub use modify_scope::ModifyScope;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ResourceChangeDetail {
pub change_source: Option<ChangeSource>,
pub evaluation: Evaluation,
pub target: ResourceTargetDefinition,
}
impl ResourceChangeDetail {
fn from_sdk(
resource_type: &str,
details: aws_sdk_cloudformation::model::ResourceChangeDetail,
) -> Self {
let causing_entity = details.causing_entity;
Self {
change_source: details
.change_source
.map(move |change_source| ChangeSource::from_sdk(&change_source, causing_entity)),
evaluation: details
.evaluation
.expect("ResourceChangeDetail without evaluation")
.as_str()
.parse()
.expect("ResourceChangeDetail with invalid evaluation"),
target: ResourceTargetDefinition::from_sdk(
resource_type,
details.target.expect("ResourceChangeDetail without target"),
),
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ChangeSource {
ResourceReference(
String,
),
ParameterReference(
String,
),
ResourceAttribute(
String,
),
DirectModification,
Automatic,
}
impl ChangeSource {
fn from_sdk(
change_source: &aws_sdk_cloudformation::model::ChangeSource,
causing_entity: Option<String>,
) -> Self {
match change_source {
aws_sdk_cloudformation::model::ChangeSource::ResourceReference
| aws_sdk_cloudformation::model::ChangeSource::ParameterReference
| aws_sdk_cloudformation::model::ChangeSource::ResourceAttribute => {
let causing_entity = causing_entity.unwrap_or_else(|| {
panic!(
"ResourceChangeDetail with change_source {:?} without causing_entity",
change_source
)
});
match change_source {
aws_sdk_cloudformation::model::ChangeSource::ResourceReference => {
Self::ResourceReference(causing_entity)
}
aws_sdk_cloudformation::model::ChangeSource::ParameterReference => {
Self::ParameterReference(causing_entity)
}
aws_sdk_cloudformation::model::ChangeSource::ResourceAttribute => {
Self::ResourceAttribute(causing_entity)
}
_ => unreachable!(),
}
}
aws_sdk_cloudformation::model::ChangeSource::DirectModification => {
Self::DirectModification
}
aws_sdk_cloudformation::model::ChangeSource::Automatic => Self::Automatic,
_ => panic!(
"ResourceChangeDetail with invalid change_source {:?}",
change_source
),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, parse_display::Display, parse_display::FromStr)]
pub enum Evaluation {
Static,
Dynamic,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ResourceTargetDefinition {
Properties {
name: String,
requires_recreation: RequiresRecreation,
},
Metadata,
CreationPolicy,
UpdatePolicy,
DeletionPolicy,
Tags,
}
impl ResourceTargetDefinition {
fn from_sdk(
resource_type: &str,
target: aws_sdk_cloudformation::model::ResourceTargetDefinition,
) -> Self {
let attribute = target
.attribute
.expect("ResourceTargetDefinition without attribute");
match attribute {
aws_sdk_cloudformation::model::ResourceAttribute::Properties => Self::Properties {
name: target
.name
.expect("ResourceTargetDefinition with attribute \"Properties\" without name"),
requires_recreation: target
.requires_recreation
.expect(concat!(
"ResourceTargetDefinition with attribute \"Properties\" without ",
"requires_recreation"
))
.as_str()
.parse()
.expect("ResourceTargetDefinition with invalid requires_recreation"),
},
aws_sdk_cloudformation::model::ResourceAttribute::Metadata
| aws_sdk_cloudformation::model::ResourceAttribute::CreationPolicy
| aws_sdk_cloudformation::model::ResourceAttribute::UpdatePolicy
| aws_sdk_cloudformation::model::ResourceAttribute::DeletionPolicy
| aws_sdk_cloudformation::model::ResourceAttribute::Tags => {
assert!(
target.name.is_none(),
"ResourceTargetDefinition with attribute {:?} with name",
attribute
);
assert!(
matches!(
target.requires_recreation,
None | Some(aws_sdk_cloudformation::model::RequiresRecreation::Never)
) || resource_type == "AWS::SecretsManager::Secret",
"ResourceTargetDefinition with attribute {:?} with requires_recreation",
attribute
);
match attribute.as_str() {
"Metadata" => Self::Metadata,
"CreationPolicy" => Self::CreationPolicy,
"UpdatePolicy" => Self::UpdatePolicy,
"DeletionPolicy" => Self::DeletionPolicy,
"Tags" => Self::Tags,
_ => unreachable!(),
}
}
_ => panic!("ResourceTargetDefinition with invalid attribute"),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, parse_display::Display, parse_display::FromStr)]
pub enum RequiresRecreation {
Never,
Conditionally,
Always,
}
pub(crate) struct ChangeSetWithType {
pub(crate) change_set: ChangeSet,
pub(crate) change_set_type: ChangeSetType,
}
pub(crate) enum CreateChangeSetError {
CreateApi(SdkError<aws_sdk_cloudformation::error::CreateChangeSetError>),
PollApi(SdkError<DescribeChangeSetError>),
Blocked { status: BlockedStackStatus },
NoChanges(ChangeSetWithType),
Failed(ChangeSetWithType),
}
impl From<SdkError<aws_sdk_cloudformation::error::CreateChangeSetError>> for CreateChangeSetError {
fn from(error: SdkError<aws_sdk_cloudformation::error::CreateChangeSetError>) -> Self {
if let Some(status) = is_create_blocked(&error) {
Self::Blocked { status }
} else {
Self::CreateApi(error)
}
}
}
impl From<SdkError<DescribeChangeSetError>> for CreateChangeSetError {
fn from(error: SdkError<DescribeChangeSetError>) -> Self {
Self::PollApi(error)
}
}
pub(crate) async fn create_change_set(
client: &aws_sdk_cloudformation::Client,
mut change_set_type: ChangeSetType,
input: CreateChangeSet,
) -> Result<ChangeSetWithType, CreateChangeSetError> {
let change_set = input
.clone()
.send()
.or_else({
let change_set_type = &mut change_set_type;
|error| async move {
match (change_set_type, error) {
(change_set_type @ ChangeSetType::Create, error)
if is_already_exists(&error) =>
{
*change_set_type = ChangeSetType::Update;
input
.change_set_type(change_set_type.into_sdk())
.send()
.await
}
(_, error) => Err(error),
}
}
})
.await?;
let change_set_id = change_set.id.expect("CreateChangeSetOutput without id");
let mut interval = interval_at(
Instant::now() + POLL_INTERVAL_CHANGE_SET,
POLL_INTERVAL_CHANGE_SET,
);
loop {
interval.tick().await;
let change_set = client
.describe_change_set()
.change_set_name(change_set_id.clone())
.send()
.await?;
let change_set = ChangeSet::from_sdk(change_set);
match change_set.status {
ChangeSetStatus::CreatePending | ChangeSetStatus::CreateInProgress => continue,
ChangeSetStatus::CreateComplete => {
return Ok(ChangeSetWithType {
change_set,
change_set_type,
})
}
ChangeSetStatus::Failed if is_no_changes(change_set.status_reason.as_deref()) => {
return Err(CreateChangeSetError::NoChanges(ChangeSetWithType {
change_set,
change_set_type,
}))
}
ChangeSetStatus::Failed => {
return Err(CreateChangeSetError::Failed(ChangeSetWithType {
change_set,
change_set_type,
}))
}
_ => {
panic!(
"change set {} had unexpected status: {}",
change_set.change_set_id, change_set.status
);
}
}
}
}
pub(crate) enum ExecuteChangeSetError {
ExecuteApi(SdkError<aws_sdk_cloudformation::error::ExecuteChangeSetError>),
Blocked { status: BlockedStackStatus },
}
impl From<SdkError<aws_sdk_cloudformation::error::ExecuteChangeSetError>>
for ExecuteChangeSetError
{
fn from(error: SdkError<aws_sdk_cloudformation::error::ExecuteChangeSetError>) -> Self {
Self::ExecuteApi(error)
}
}
pub(crate) async fn execute_change_set(
client: &aws_sdk_cloudformation::Client,
stack_id: String,
change_set_id: String,
change_set_type: ChangeSetType,
disable_rollback: bool,
) -> Result<
StackOperation<'_, impl Fn(StackStatus) -> StackOperationStatus + Unpin>,
ExecuteChangeSetError,
> {
let started_at = Utc::now();
client
.execute_change_set()
.set_disable_rollback(Some(disable_rollback))
.change_set_name(change_set_id)
.send()
.await
.map_err(|error| {
if let Some(status) = is_execute_blocked(&error) {
return ExecuteChangeSetError::Blocked { status };
}
ExecuteChangeSetError::ExecuteApi(error)
})?;
Ok(StackOperation::new(
client,
stack_id,
started_at,
match change_set_type {
ChangeSetType::Create => check_create_progress,
ChangeSetType::Update => check_update_progress,
},
))
}
fn is_already_exists(
error: &SdkError<aws_sdk_cloudformation::error::CreateChangeSetError>,
) -> bool {
error.to_string().contains(" already exists ")
}
fn is_create_blocked(
error: &SdkError<aws_sdk_cloudformation::error::CreateChangeSetError>,
) -> Option<BlockedStackStatus> {
lazy_static::lazy_static! {
static ref BLOCKED: regex::Regex = regex::Regex::new(r"(?i)^Stack:[^ ]* is in (?P<status>[_A-Z]+) state and can not be updated").unwrap();
}
let SdkError::ServiceError { err, .. } = error else {
return None;
};
is_blocked(&BLOCKED, err.message().unwrap())
}
fn is_execute_blocked(
error: &SdkError<aws_sdk_cloudformation::error::ExecuteChangeSetError>,
) -> Option<BlockedStackStatus> {
lazy_static::lazy_static! {
static ref BLOCKED: regex::Regex = regex::Regex::new(r"(?i)^This stack is currently in a non-terminal \[(?P<status>[_A-Z]+)\] state").unwrap();
}
let SdkError::ServiceError { err, .. } = error else {
return None;
};
is_blocked(&BLOCKED, err.message().unwrap())
}
fn is_blocked(pattern: &Regex, message: &str) -> Option<BlockedStackStatus> {
let Some(detail) = pattern.captures(message) else {
return None;
};
let status: StackStatus = detail
.name("status")
.unwrap()
.as_str()
.parse()
.expect("captured invalid status");
let status = BlockedStackStatus::try_from(status).expect("captured non-blocked status");
Some(status)
}
fn is_no_changes(status_reason: Option<&str>) -> bool {
let status_reason = status_reason.unwrap_or_default();
status_reason.contains("The submitted information didn't contain changes.")
|| status_reason.contains("No updates are to be performed.")
}
fn check_create_progress(stack_status: StackStatus) -> StackOperationStatus {
match stack_status {
StackStatus::CreateInProgress | StackStatus::RollbackInProgress => {
StackOperationStatus::InProgress
}
StackStatus::CreateComplete => StackOperationStatus::Complete,
StackStatus::CreateFailed | StackStatus::RollbackFailed | StackStatus::RollbackComplete => {
StackOperationStatus::Failed
}
_ => StackOperationStatus::Unexpected,
}
}
fn check_update_progress(stack_status: StackStatus) -> StackOperationStatus {
match stack_status {
StackStatus::UpdateInProgress
| StackStatus::UpdateCompleteCleanupInProgress
| StackStatus::UpdateRollbackInProgress
| StackStatus::UpdateRollbackCompleteCleanupInProgress => StackOperationStatus::InProgress,
StackStatus::UpdateComplete => StackOperationStatus::Complete,
StackStatus::UpdateFailed
| StackStatus::UpdateRollbackFailed
| StackStatus::UpdateRollbackComplete => StackOperationStatus::Failed,
_ => StackOperationStatus::Unexpected,
}
}