use std::{collections::BTreeMap, fmt, iter, pin::Pin, task, time::Duration};
use async_stream::try_stream;
use aws_sdk_cloudformation::{
error::SdkError, operation::describe_stack_events::DescribeStackEventsError,
};
use aws_smithy_types_convert::date_time::DateTimeExt;
use chrono::{DateTime, Utc};
use futures_util::{stream, Stream, TryStreamExt};
use crate::{
status_reason::StatusReason, ResourceStatus, StackEvent, StackEventDetails, StackStatus, Status,
};
const POLL_INTERVAL_STACK_EVENT: Duration = Duration::from_secs(5);
#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct StackFailure {
pub stack_id: String,
pub stack_status: StackStatus,
pub stack_status_reason: String,
pub resource_events: Vec<(ResourceStatus, StackEventDetails)>,
}
impl StackFailure {
#[must_use]
pub fn stack_status_reason(&self) -> StatusReason {
StatusReason::new(Some(&self.stack_status_reason))
}
}
impl fmt::Display for StackFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Stack operation failed for {}; terminal status: {} ({})",
self.stack_id, self.stack_status, self.stack_status_reason
)?;
if !self.resource_events.is_empty() {
writeln!(f, "\nThe following resources had errors:")?;
}
for (resource_status, details) in &self.resource_events {
write!(
f,
"\n- {} ({}): {} ({})",
details.logical_resource_id,
details.resource_type,
resource_status,
details
.resource_status_reason
.as_deref()
.unwrap_or("no reason reported"),
)?;
}
Ok(())
}
}
#[derive(Debug)]
#[allow(clippy::module_name_repetitions)]
pub struct StackWarning {
pub stack_id: String,
pub resource_events: Vec<(ResourceStatus, StackEventDetails)>,
}
impl fmt::Display for StackWarning {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(
f,
"Stack {} applied successfully but some resources had errors:",
self.stack_id
)?;
for (resource_status, details) in &self.resource_events {
write!(
f,
"\n- {} ({}): {} ({})",
details.logical_resource_id,
details.resource_type,
resource_status,
details
.resource_status_reason
.as_deref()
.unwrap_or("no reason reported")
)?;
}
Ok(())
}
}
pub(crate) enum StackOperationError {
Failure(StackFailure),
Warning(StackWarning),
}
pub(crate) enum StackOperationStatus {
InProgress,
Complete,
Failed,
Unexpected,
}
pub(crate) struct StackOperation<'client, F> {
stack_id: String,
check_progress: F,
events: Pin<
Box<dyn Stream<Item = Result<StackEvent, SdkError<DescribeStackEventsError>>> + 'client>,
>,
stack_error_status: Option<StackStatus>,
stack_error_status_reason: Option<String>,
resource_error_events: Vec<(ResourceStatus, StackEventDetails)>,
}
impl<'client, F> StackOperation<'client, F>
where
F: Fn(StackStatus) -> StackOperationStatus + Unpin,
{
pub(crate) fn new(
client: &'client aws_sdk_cloudformation::Client,
stack_id: String,
started_at: DateTime<Utc>,
check_progress: F,
) -> Self {
let root_stack_id = stack_id.clone();
let events = try_stream! {
let mut interval = tokio::time::interval(POLL_INTERVAL_STACK_EVENT);
let mut since = started_at;
let mut nested_stacks = BTreeMap::<String, String>::new();
loop {
interval.tick().await;
let stack_ids = iter::once(root_stack_id.clone()).chain(nested_stacks.keys().cloned());
let stack_events = stack_ids.into_iter().map(|stack_id| stream::once(Box::pin(async {
let stack_events: Vec<_> = client
.describe_stack_events()
.stack_name(stack_id)
.send()
.await?
.stack_events
.expect("DescribeStackEventsOutput without stack_events")
.into_iter()
.take_while(|event| {
event
.timestamp
.expect("StackEvent without timestamp")
.to_chrono_utc()
.expect("invalid timestamp")
> since
})
.map(|event| {
let stack_alias = event.stack_id().and_then(|stack_id| nested_stacks.get(stack_id)).cloned();
StackEvent::from_sdk(stack_alias, event)
})
.filter(|event| {
match event {
StackEvent::Stack { details, .. } => details.stack_id() == root_stack_id,
StackEvent::Resource{ .. } => true,
}
})
.collect();
Ok::<_, SdkError<DescribeStackEventsError>>(stack_events)
})));
let stack_events: Vec<_> = stream::select_all(stack_events).try_collect().await?;
let stack_events: Vec<_> = stack_events.into_iter().flatten().collect();
if let Some(stack_event) = stack_events.first() {
since = *stack_event.timestamp();
}
for stack_event in stack_events.into_iter().rev() {
let is_terminal = stack_event.is_terminal();
match &stack_event {
StackEvent::Resource {
details: details @ StackEventDetails {
physical_resource_id: Some(nested_stack_id),
..
},
..
} if details.resource_type() == "AWS::CloudFormation::Stack" && !nested_stack_id.is_empty() => {
let stack_alias = nested_stacks
.get(details.stack_id())
.map(String::as_str)
.into_iter()
.chain(iter::once(details.logical_resource_id()))
.collect::<Vec<_>>()
.join("/");
nested_stacks.insert(nested_stack_id.clone(), stack_alias);
},
_ => {},
}
yield stack_event;
if is_terminal {
return;
}
}
}
};
Self {
stack_id,
check_progress,
events: Box::pin(events),
stack_error_status: None,
stack_error_status_reason: None,
resource_error_events: Vec::new(),
}
}
pub(crate) fn verify(self) -> Result<(), StackOperationError> {
if let Some(stack_status) = self.stack_error_status {
return Err(StackOperationError::Failure(StackFailure {
stack_id: self.stack_id,
stack_status,
stack_status_reason: self
.stack_error_status_reason
.expect("stack op failed with no reasons"),
resource_events: self.resource_error_events,
}));
}
if self.resource_error_events.is_empty() {
Ok(())
} else {
Err(StackOperationError::Warning(StackWarning {
stack_id: self.stack_id,
resource_events: self.resource_error_events,
}))
}
}
}
impl<F> Stream for StackOperation<'_, F>
where
F: Fn(StackStatus) -> StackOperationStatus + Unpin,
{
type Item = Result<StackEvent, SdkError<DescribeStackEventsError>>;
fn poll_next(
mut self: Pin<&mut Self>,
ctx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
match self.events.as_mut().poll_next(ctx) {
task::Poll::Pending => task::Poll::Pending,
task::Poll::Ready(None) => task::Poll::Ready(None),
task::Poll::Ready(Some(Err(error))) => task::Poll::Ready(Some(Err(error))),
task::Poll::Ready(Some(Ok(event))) => {
match &event {
StackEvent::Resource {
resource_status,
details,
} => {
if resource_status.sentiment().is_negative() {
self.resource_error_events
.push((*resource_status, details.clone()));
}
}
StackEvent::Stack {
resource_status, ..
} if event.stack_id() == self.stack_id => {
if resource_status.sentiment().is_negative() {
if let Some(reason) = event.resource_status_reason() {
self.stack_error_status_reason.replace(reason.to_string());
}
}
match (self.check_progress)(*resource_status) {
StackOperationStatus::InProgress | StackOperationStatus::Complete => {}
StackOperationStatus::Failed => {
self.stack_error_status = Some(*resource_status);
}
StackOperationStatus::Unexpected => {
panic!("stack has unexpected status for: {}", resource_status);
}
}
}
StackEvent::Stack { .. } => {}
}
task::Poll::Ready(Some(Ok(event)))
}
}
}
}