use std::{fmt, future::Future, pin::Pin, task};
use async_stream::try_stream;
use aws_sdk_cloudformation::{
client::fluent_builders, error::DescribeStacksError, types::SdkError,
};
use chrono::Utc;
use futures_util::{Stream, TryStreamExt};
use crate::{
stack::{StackOperation, StackOperationError, StackOperationStatus},
StackEvent, StackFailure, StackStatus, StackWarning,
};
#[allow(clippy::module_name_repetitions)]
pub struct DeleteStackInput {
pub client_request_token: Option<String>,
pub retain_resources: Option<Vec<String>>,
pub role_arn: Option<String>,
pub stack_name: String,
}
impl DeleteStackInput {
pub fn new(stack_name: impl Into<String>) -> Self {
Self {
stack_name: stack_name.into(),
client_request_token: None,
retain_resources: None,
role_arn: None,
}
}
#[must_use]
pub fn set_client_request_token(mut self, client_request_token: impl Into<String>) -> Self {
self.client_request_token = Some(client_request_token.into());
self
}
#[must_use]
pub fn set_retain_resources<I, S>(mut self, retain_resources: I) -> Self
where
I: Into<Vec<S>>,
S: Into<String>,
{
self.retain_resources = Some(
retain_resources
.into()
.into_iter()
.map(Into::into)
.collect(),
);
self
}
#[must_use]
pub fn set_role_arn(mut self, role_arn: impl Into<String>) -> Self {
self.role_arn = Some(role_arn.into());
self
}
fn configure(self, input: fluent_builders::DeleteStack) -> fluent_builders::DeleteStack {
input
.set_client_request_token(self.client_request_token)
.set_retain_resources(self.retain_resources)
.set_role_arn(self.role_arn)
.stack_name(self.stack_name)
}
}
#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
pub enum DeleteStackError {
CloudFormationApi(Box<dyn std::error::Error>),
Failure(StackFailure),
Warning(StackWarning),
}
impl DeleteStackError {
fn from_sdk_error<E: std::error::Error + 'static>(error: SdkError<E>) -> Self {
Self::CloudFormationApi(error.into())
}
}
impl fmt::Display for DeleteStackError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::CloudFormationApi(error) => {
write!(f, "CloudFormation API error: {}", error)
}
Self::Failure(failure) => write!(f, "{}", failure),
Self::Warning(warning) => write!(f, "{}", warning),
}
}
}
impl std::error::Error for DeleteStackError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::CloudFormationApi(error) => Some(error.as_ref()),
Self::Failure { .. } | Self::Warning { .. } => None,
}
}
}
pub struct DeleteStack<'client> {
event_stream: Pin<Box<dyn Stream<Item = Result<StackEvent, DeleteStackError>> + 'client>>,
output: Option<Result<(), DeleteStackError>>,
}
impl<'client> DeleteStack<'client> {
pub(crate) fn new(
client: &'client aws_sdk_cloudformation::Client,
input: DeleteStackInput,
) -> Self {
let event_stream = try_stream! {
let stack_id = match describe_stack_id(client, input.stack_name.clone()).await? {
Some(stack_id) => stack_id,
None => return,
};
let started_at = Utc::now();
input.configure(client.delete_stack()).send()
.await
.map_err(DeleteStackError::from_sdk_error)?;
let mut operation =
StackOperation::new(client, stack_id, started_at, check_operation_status);
while let Some(event) = operation
.try_next()
.await
.map_err(DeleteStackError::from_sdk_error)?
{
yield event;
}
match operation.verify() {
Ok(()) => {}
Err(StackOperationError::Failure(failure)) => {
Err(DeleteStackError::Failure(failure))?;
unreachable!()
}
Err(StackOperationError::Warning(warning)) => {
Err(DeleteStackError::Warning(warning))?;
unreachable!()
}
};
};
Self {
event_stream: Box::pin(event_stream),
output: None,
}
}
pub fn events(&mut self) -> DeleteStackEvents<'client, '_> {
DeleteStackEvents(self)
}
fn poll_next_internal(&mut self, ctx: &mut task::Context) -> task::Poll<Option<StackEvent>> {
match self.event_stream.as_mut().poll_next(ctx) {
task::Poll::Pending => task::Poll::Pending,
task::Poll::Ready(None) => {
self.output.get_or_insert(Ok(()));
task::Poll::Ready(None)
}
task::Poll::Ready(Some(Ok(event))) => task::Poll::Ready(Some(event)),
task::Poll::Ready(Some(Err(error))) => {
self.output.replace(Err(error));
task::Poll::Ready(None)
}
}
}
}
impl Future for DeleteStack<'_> {
type Output = Result<(), DeleteStackError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context) -> task::Poll<Self::Output> {
loop {
match self.poll_next_internal(ctx) {
task::Poll::Pending => return task::Poll::Pending,
task::Poll::Ready(None) => {
return task::Poll::Ready(
self.output
.take()
.expect("end of stream without err or output"),
)
}
task::Poll::Ready(Some(_)) => continue,
}
}
}
}
#[allow(clippy::module_name_repetitions)]
pub struct DeleteStackEvents<'client, 'delete>(&'delete mut DeleteStack<'client>);
impl Stream for DeleteStackEvents<'_, '_> {
type Item = StackEvent;
fn poll_next(
mut self: Pin<&mut Self>,
ctx: &mut task::Context,
) -> task::Poll<Option<Self::Item>> {
self.0.poll_next_internal(ctx)
}
}
async fn describe_stack_id(
client: &aws_sdk_cloudformation::Client,
stack_name: String,
) -> Result<Option<String>, DeleteStackError> {
let output = match client.describe_stacks().stack_name(stack_name).send().await {
Ok(output) => output,
Err(error) if is_not_exists(&error) => return Ok(None),
Err(error) => return Err(DeleteStackError::from_sdk_error(error)),
};
let stack = output
.stacks
.expect("DescribeStacksOutput without stacks")
.pop()
.expect("DescribeStacksOutput empty stacks");
if stack.stack_status == Some(aws_sdk_cloudformation::model::StackStatus::DeleteComplete) {
Ok(None)
} else {
Ok(Some(stack.stack_id.expect("Stack without stack_id")))
}
}
fn check_operation_status(stack_status: StackStatus) -> StackOperationStatus {
match stack_status {
StackStatus::DeleteInProgress => StackOperationStatus::InProgress,
StackStatus::DeleteComplete => StackOperationStatus::Complete,
StackStatus::DeleteFailed => StackOperationStatus::Failed,
_ => StackOperationStatus::Unexpected,
}
}
fn is_not_exists(error: &SdkError<DescribeStacksError>) -> bool {
error.to_string().contains("does not exist")
}