use std::{fmt, future::Future, pin::Pin, task};
use async_stream::try_stream;
use aws_sdk_cloudformation::{
client::fluent_builders::CreateChangeSet, model::Stack, types::SdkError,
};
use aws_smithy_types_convert::date_time::DateTimeExt;
use chrono::{DateTime, Utc};
use futures_util::{Stream, TryFutureExt, TryStreamExt};
use crate::{
change_set::{
create_change_set, execute_change_set, ChangeSet, ChangeSetType, ChangeSetWithType,
CreateChangeSetError, ExecuteChangeSetError,
},
stack::StackOperationError,
BlockedStackStatus, ChangeSetStatus, StackEvent, StackFailure, StackStatus, StackWarning, Tag,
};
#[derive(Clone, Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct ApplyStackInput {
pub capabilities: Vec<Capability>,
pub client_request_token: Option<String>,
pub disable_rollback: bool,
pub notification_arns: Vec<String>,
pub parameters: Vec<Parameter>,
pub resource_types: Option<Vec<String>>,
pub role_arn: Option<String>,
pub stack_name: String,
pub tags: Vec<Tag>,
pub template_source: TemplateSource,
}
impl ApplyStackInput {
pub fn new(stack_name: impl Into<String>, template_source: TemplateSource) -> Self {
Self {
stack_name: stack_name.into(),
template_source,
capabilities: Vec::new(),
client_request_token: None,
disable_rollback: false,
notification_arns: Vec::new(),
parameters: Vec::new(),
resource_types: None,
role_arn: None,
tags: Vec::new(),
}
}
#[must_use]
pub fn set_capabilities(mut self, capabilities: impl Into<Vec<Capability>>) -> Self {
self.capabilities = capabilities.into();
self
}
#[must_use]
pub fn set_client_request_token(mut self, client_request_token: impl Into<String>) -> Self {
self.client_request_token = Some(client_request_token.into());
self
}
#[must_use]
pub fn set_disable_rollback(mut self, disable_rollback: bool) -> Self {
self.disable_rollback = disable_rollback;
self
}
#[must_use]
pub fn set_notification_arns<I, S>(mut self, notification_arns: I) -> Self
where
I: Into<Vec<S>>,
S: Into<String>,
{
self.notification_arns = notification_arns
.into()
.into_iter()
.map(Into::into)
.collect();
self
}
#[must_use]
pub fn set_parameters(mut self, parameters: impl Into<Vec<Parameter>>) -> Self {
self.parameters = parameters.into();
self
}
#[must_use]
pub fn set_resource_types<I, S>(mut self, resource_types: I) -> Self
where
I: Into<Vec<S>>,
S: Into<String>,
{
self.resource_types = Some(resource_types.into().into_iter().map(Into::into).collect());
self
}
#[must_use]
pub fn set_role_arn(mut self, role_arn: impl Into<String>) -> Self {
self.role_arn = Some(role_arn.into());
self
}
#[must_use]
pub fn set_tags(mut self, tags: impl Into<Vec<Tag>>) -> Self {
self.tags = tags.into();
self
}
fn configure(self, op: CreateChangeSet) -> (ChangeSetType, CreateChangeSet) {
let change_set_type = ChangeSetType::Create;
let (template_body, template_url) = match self.template_source {
TemplateSource::Inline { body } => (Some(body), None),
TemplateSource::S3 { url } => (None, Some(url)),
};
let input = op
.set_capabilities(Some(
self.capabilities
.into_iter()
.map(Capability::into_sdk)
.collect(),
))
.change_set_name(format!("apply-stack-{}", Utc::now().timestamp_millis()))
.change_set_type(change_set_type.into_sdk())
.set_notification_ar_ns(Some(self.notification_arns))
.set_parameters(Some(
self.parameters
.into_iter()
.map(Parameter::into_sdk)
.collect(),
))
.set_resource_types(self.resource_types)
.set_role_arn(self.role_arn)
.stack_name(self.stack_name)
.set_tags(Some(self.tags.into_iter().map(Tag::into_sdk).collect()))
.set_template_body(template_body)
.set_template_url(template_url);
(change_set_type, input)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, parse_display::Display, parse_display::FromStr)]
pub enum Capability {
#[display("CAPABILITY_IAM")]
Iam,
#[display("CAPABILITY_NAMED_IAM")]
NamedIam,
#[display("CAPABILITY_AUTO_EXPAND")]
AutoExpand,
}
impl Capability {
fn into_sdk(self) -> aws_sdk_cloudformation::model::Capability {
match self {
Self::Iam => aws_sdk_cloudformation::model::Capability::CapabilityIam,
Self::NamedIam => aws_sdk_cloudformation::model::Capability::CapabilityNamedIam,
Self::AutoExpand => aws_sdk_cloudformation::model::Capability::CapabilityAutoExpand,
}
}
}
#[derive(Clone, Debug)]
pub struct Parameter {
pub key: String,
pub value: String,
}
impl Parameter {
fn into_sdk(self) -> aws_sdk_cloudformation::model::Parameter {
aws_sdk_cloudformation::model::Parameter::builder()
.parameter_key(self.key)
.parameter_value(self.value)
.build()
}
}
#[derive(Clone, Debug)]
pub enum TemplateSource {
Inline { body: String },
S3 { url: String },
}
impl TemplateSource {
#[must_use]
pub fn inline(body: impl Into<String>) -> Self {
Self::Inline { body: body.into() }
}
#[must_use]
pub fn s3(url: impl Into<String>) -> Self {
Self::S3 { url: url.into() }
}
}
#[derive(Debug, Eq, PartialEq)]
#[allow(clippy::module_name_repetitions)]
pub struct ApplyStackOutput {
pub change_set_id: String,
pub creation_time: DateTime<Utc>,
pub description: Option<String>,
pub last_updated_time: Option<DateTime<Utc>>,
pub outputs: Vec<StackOutput>,
pub stack_id: String,
pub stack_name: String,
pub stack_status: StackStatus,
pub tags: Vec<Tag>,
}
impl ApplyStackOutput {
fn from_raw(stack: Stack) -> Self {
Self {
change_set_id: stack.change_set_id.expect("Stack without change_set_id"),
creation_time: stack
.creation_time
.expect("Stack without creation_time")
.to_chrono_utc(),
description: stack.description,
last_updated_time: stack
.last_updated_time
.as_ref()
.map(DateTimeExt::to_chrono_utc),
outputs: stack
.outputs
.map(|outputs| {
outputs
.into_iter()
.map(|output| StackOutput {
description: output.description,
export_name: output.export_name,
key: output.output_key.expect("StackOutput without output_key"),
value: output
.output_value
.expect("StackOutput without output_value"),
})
.collect()
})
.unwrap_or_default(),
stack_id: stack.stack_id.expect("Stack without stack_id"),
stack_name: stack.stack_name.expect("Stack without stack_name"),
stack_status: stack
.stack_status
.expect("Stack without stack_status")
.as_str()
.parse()
.expect("invalid stack status"),
tags: stack
.tags
.unwrap_or_default()
.into_iter()
.map(Tag::from_sdk)
.collect(),
}
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct StackOutput {
pub description: Option<String>,
pub export_name: Option<String>,
pub key: String,
pub value: String,
}
#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
pub enum ApplyStackError {
CloudFormationApi(Box<dyn std::error::Error>),
Blocked {
status: BlockedStackStatus,
},
CreateChangeSetFailed {
id: String,
status: ChangeSetStatus,
status_reason: String,
},
Failure(StackFailure),
Warning {
output: ApplyStackOutput,
warning: StackWarning,
},
}
impl ApplyStackError {
fn from_sdk_error<E: std::error::Error + 'static>(error: SdkError<E>) -> Self {
Self::CloudFormationApi(error.into())
}
}
impl fmt::Display for ApplyStackError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::CloudFormationApi(error) => {
write!(f, "CloudFormation API error: {}", error)
}
Self::Blocked { status } => {
write!(
f,
"stack operation failed because the stack is in a blocked state: {}",
status
)
}
Self::CreateChangeSetFailed {
id,
status,
status_reason,
} => {
write!(
f,
"Change set {} failed to create; terminal status: {} ({})",
id, status, status_reason
)
}
Self::Failure(failure) => write!(f, "{}", failure),
Self::Warning { warning, .. } => write!(f, "{}", warning),
}
}
}
impl std::error::Error for ApplyStackError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::CloudFormationApi(error) => Some(error.as_ref()),
Self::Blocked { .. }
| Self::CreateChangeSetFailed { .. }
| Self::Failure { .. }
| Self::Warning { .. } => None,
}
}
}
pub struct ApplyStack<'client> {
event_stream: Pin<Box<dyn Stream<Item = Result<ApplyStackEvent, ApplyStackError>> + 'client>>,
output: Option<Result<ApplyStackOutput, ApplyStackError>>,
}
impl<'client> ApplyStack<'client> {
pub(crate) fn new(
client: &'client aws_sdk_cloudformation::Client,
input: ApplyStackInput,
) -> Self {
let disable_rollback = input.disable_rollback;
let event_stream = try_stream! {
let (stack_id, change_set_id, change_set_type) =
match create_change_set_internal(client, input).await? {
Ok(ChangeSetWithType {
change_set,
change_set_type,
}) => {
let stack_id = change_set.stack_id.clone();
let change_set_id = change_set.change_set_id.clone();
yield ApplyStackEvent::ChangeSet(change_set);
(stack_id, change_set_id, change_set_type)
}
Err(ChangeSetWithType { change_set, .. }) => {
let stack_id = change_set.stack_id.clone();
yield ApplyStackEvent::ChangeSet(change_set);
let output = describe_output(client, stack_id).await?;
yield ApplyStackEvent::Output(output);
return;
}
};
let mut operation =
execute_change_set(client, stack_id.clone(), change_set_id, change_set_type, disable_rollback)
.await
.map_err(|error| match error {
ExecuteChangeSetError::ExecuteApi(error) => ApplyStackError::from_sdk_error(error),
ExecuteChangeSetError::Blocked { status } => ApplyStackError::Blocked { status },
})?;
while let Some(event) = operation
.try_next()
.await
.map_err(ApplyStackError::from_sdk_error)?
{
yield ApplyStackEvent::Event(event);
}
let warning = match operation.verify() {
Err(StackOperationError::Failure(failure)) => {
Err(ApplyStackError::Failure(failure))?;
unreachable!()
}
Ok(_) => None,
Err(StackOperationError::Warning(warning)) => Some(warning),
};
let output = describe_output(client, stack_id).await?;
match warning {
Some(warning) => {
Err(ApplyStackError::Warning { output, warning })?;
unreachable!()
}
None => yield ApplyStackEvent::Output(output),
};
};
Self {
event_stream: Box::pin(event_stream),
output: None,
}
}
pub fn change_set(&mut self) -> ApplyStackChangeSet<'client, '_> {
ApplyStackChangeSet(self)
}
pub fn events(&mut self) -> ApplyStackEvents<'client, '_> {
ApplyStackEvents(self)
}
}
impl Future for ApplyStack<'_> {
type Output = Result<ApplyStackOutput, ApplyStackError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
loop {
match self.event_stream.as_mut().poll_next(ctx) {
task::Poll::Pending => return task::Poll::Pending,
task::Poll::Ready(None) => {
return task::Poll::Ready(
self.output
.take()
.expect("end of stream without err or output"),
)
}
task::Poll::Ready(Some(Ok(
ApplyStackEvent::ChangeSet(_) | ApplyStackEvent::Event(_),
))) => continue,
task::Poll::Ready(Some(Ok(ApplyStackEvent::Output(output)))) => {
self.output.replace(Ok(output));
continue;
}
task::Poll::Ready(Some(Err(error))) => {
self.output.replace(Err(error));
continue;
}
}
}
}
}
#[allow(clippy::module_name_repetitions)]
pub struct ApplyStackChangeSet<'client, 'apply>(&'apply mut ApplyStack<'client>);
impl Future for ApplyStackChangeSet<'_, '_> {
type Output = Result<ChangeSet, ApplyStackError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
loop {
match self.0.event_stream.as_mut().poll_next(ctx) {
task::Poll::Pending => return task::Poll::Pending,
task::Poll::Ready(None) => match self.0.output.take() {
None => panic!("end of stream without change set"),
Some(Ok(_)) => panic!("saw output before change set"),
Some(Err(error)) => return task::Poll::Ready(Err(error)),
},
task::Poll::Ready(Some(Ok(ApplyStackEvent::ChangeSet(change_set)))) => {
return task::Poll::Ready(Ok(change_set));
}
task::Poll::Ready(Some(Ok(ApplyStackEvent::Event(_)))) => {
panic!("saw stack event before change set");
}
task::Poll::Ready(Some(Ok(ApplyStackEvent::Output(_)))) => {
panic!("saw output before change set");
}
task::Poll::Ready(Some(Err(error))) => {
self.0.output.replace(Err(error));
continue;
}
}
}
}
}
#[allow(clippy::module_name_repetitions)]
pub struct ApplyStackEvents<'client, 'apply>(&'apply mut ApplyStack<'client>);
impl Stream for ApplyStackEvents<'_, '_> {
type Item = StackEvent;
fn poll_next(
mut self: Pin<&mut Self>,
ctx: &mut task::Context,
) -> task::Poll<Option<Self::Item>> {
loop {
match self.0.event_stream.as_mut().poll_next(ctx) {
task::Poll::Pending => return task::Poll::Pending,
task::Poll::Ready(None) => return task::Poll::Ready(None),
task::Poll::Ready(Some(Ok(ApplyStackEvent::ChangeSet(_)))) => continue,
task::Poll::Ready(Some(Ok(ApplyStackEvent::Event(event)))) => {
return task::Poll::Ready(Some(event))
}
task::Poll::Ready(Some(Ok(ApplyStackEvent::Output(output)))) => {
self.0.output.replace(Ok(output));
return task::Poll::Ready(None);
}
task::Poll::Ready(Some(Err(error))) => {
self.0.output.replace(Err(error));
return task::Poll::Ready(None);
}
}
}
}
}
enum ApplyStackEvent {
ChangeSet(ChangeSet),
Event(StackEvent),
Output(ApplyStackOutput),
}
async fn create_change_set_internal(
client: &aws_sdk_cloudformation::Client,
input: ApplyStackInput,
) -> Result<Result<ChangeSetWithType, ChangeSetWithType>, ApplyStackError> {
let (change_set_type, input) = input.configure(client.create_change_set());
let error = match create_change_set(client, change_set_type, input).await {
Ok(change_set) => return Ok(Ok(change_set)),
Err(error) => error,
};
match error {
CreateChangeSetError::NoChanges(change_set) => Ok(Err(change_set)),
CreateChangeSetError::CreateApi(error) => Err(ApplyStackError::from_sdk_error(error)),
CreateChangeSetError::PollApi(error) => Err(ApplyStackError::from_sdk_error(error)),
CreateChangeSetError::Blocked { status } => Err(ApplyStackError::Blocked { status }),
CreateChangeSetError::Failed(ChangeSetWithType { change_set, .. }) => {
Err(ApplyStackError::CreateChangeSetFailed {
id: change_set.change_set_id,
status: change_set.status,
status_reason: change_set
.status_reason
.expect("ChangeSet failed without reason"),
})
}
}
}
async fn describe_output(
client: &aws_sdk_cloudformation::Client,
stack_id: String,
) -> Result<ApplyStackOutput, ApplyStackError> {
let stack = client
.describe_stacks()
.stack_name(stack_id)
.send()
.map_err(ApplyStackError::from_sdk_error)
.await?
.stacks
.expect("DescribeStacksOutput without stacks")
.pop()
.expect("DescribeStacksOutput empty stacks");
Ok(ApplyStackOutput::from_raw(stack))
}
#[cfg(test)]
mod tests {
use super::Capability;
#[test]
fn test_parse_display() {
assert_eq!(Capability::Iam.to_string(), "CAPABILITY_IAM");
assert_eq!(Capability::Iam, "CAPABILITY_IAM".parse().unwrap());
assert_eq!(Capability::NamedIam.to_string(), "CAPABILITY_NAMED_IAM");
assert_eq!(
Capability::NamedIam,
"CAPABILITY_NAMED_IAM".parse().unwrap(),
);
assert_eq!(Capability::AutoExpand.to_string(), "CAPABILITY_AUTO_EXPAND");
assert_eq!(
Capability::AutoExpand,
"CAPABILITY_AUTO_EXPAND".parse().unwrap(),
);
}
}