1use crate::types::{ClientRequestToken, StackId};
4use aws_sdk_cloudformation::types::{ResourceStatus, StackEvent};
5
6pub(crate) struct Poll {
7 pub(crate) client_request_token: Option<ClientRequestToken>,
8 pub(crate) stack_id: StackId,
9 pub(crate) start_condition: fn(&StackEvent) -> bool,
10 pub(crate) stop_condition: fn(&StackEvent) -> bool,
11}
12
13impl Poll {
14 pub(crate) fn default(stack_id: StackId) -> Poll {
15 Self {
16 client_request_token: None,
17 stack_id,
18 start_condition: (|_event| false),
19 stop_condition: (|_event| false),
20 }
21 }
22
23 pub(crate) fn wait_for_remote_operation(
24 remote_operation: crate::instance_spec::RemoteOperation,
25 ) -> Poll {
26 fn is_root_stack_resource_event(stack_event: &StackEvent) -> bool {
27 stack_event.resource_type.as_deref() == Some("AWS::CloudFormation::Stack")
28 && stack_event.stack_id == stack_event.physical_resource_id
29 }
30
31 fn is_known_unknown(value: &str) -> bool {
32 match value {
33 "UPDATE_COMPLETE_CLEANUP_IN_PROGRESS" => false,
35 "UPDATE_ROLLBACK_COMPLETE_CLEANUP_IN_PROGRESS" => false,
36 _other => panic!("Unexpected resource status: {value:#?}"),
37 }
38 }
39
40 fn is_initial(stack_event: &StackEvent) -> bool {
41 if is_root_stack_resource_event(stack_event) {
42 match &stack_event.resource_status.as_ref().unwrap() {
43 ResourceStatus::CreateComplete => false,
44 ResourceStatus::DeleteComplete => false,
45 ResourceStatus::UpdateComplete => false,
46 ResourceStatus::UpdateRollbackComplete => false,
47 ResourceStatus::RollbackComplete => false,
48 ResourceStatus::CreateInProgress => true,
49 ResourceStatus::DeleteInProgress => true,
50 ResourceStatus::RollbackInProgress => true,
51 ResourceStatus::UpdateInProgress => false,
52 ResourceStatus::UpdateRollbackInProgress => true,
53 unknown => is_known_unknown(unknown.as_str()),
54 }
55 } else {
56 false
57 }
58 }
59
60 fn is_final(stack_event: &StackEvent) -> bool {
61 if is_root_stack_resource_event(stack_event) {
62 match &stack_event.resource_status.as_ref().unwrap() {
63 ResourceStatus::CreateComplete => true,
64 ResourceStatus::DeleteComplete => true,
65 ResourceStatus::UpdateComplete => true,
66 ResourceStatus::UpdateRollbackComplete => true,
67 ResourceStatus::RollbackComplete => true,
68 ResourceStatus::CreateInProgress => false,
69 ResourceStatus::DeleteInProgress => false,
70 ResourceStatus::RollbackInProgress => false,
71 ResourceStatus::UpdateInProgress => false,
72 ResourceStatus::UpdateRollbackInProgress => false,
73 unknown => is_known_unknown(unknown.as_str()),
74 }
75 } else {
76 false
77 }
78 }
79
80 Self {
81 client_request_token: Some(remote_operation.client_request_token),
82 stack_id: remote_operation.stack_id,
83 start_condition: is_initial,
84 stop_condition: is_final,
85 }
86 }
87}
88
89enum ReadPagesStatus {
90 Continue,
91 Stop,
92}
93
94impl Poll {
95 pub(crate) async fn run(
96 &self,
97 cloudformation: &aws_sdk_cloudformation::client::Client,
98 action: fn(&StackEvent),
99 ) {
100 let mut initial_pages = vec![];
101
102 self.read_pages(cloudformation, |stack_events| {
103 let mut page = vec![];
104
105 let mut status = ReadPagesStatus::Continue;
106
107 for stack_event in stack_events.iter() {
108 if self.allow_client_request_token(stack_event) {
109 page.push(stack_event.clone());
110 if (self.start_condition)(stack_event) {
111 status = ReadPagesStatus::Stop;
112 break;
113 }
114 }
115 }
116
117 initial_pages.push(page);
118
119 status
120 })
121 .await;
122
123 for stack_events in initial_pages.iter().rev() {
124 for stack_event in stack_events.iter().rev() {
125 action(stack_event);
126 if (self.stop_condition)(stack_event) {
127 return;
128 }
129 }
130 }
131
132 let mut youngest: Option<String> = initial_pages
133 .first()
134 .unwrap()
135 .first()
136 .unwrap()
137 .event_id
138 .clone();
139
140 loop {
141 let mut new_pages = vec![];
142
143 self.read_pages(cloudformation, |stack_events| {
144 let mut page = vec![];
145
146 let mut status = ReadPagesStatus::Continue;
147
148 for stack_event in stack_events.iter() {
149 if stack_event.event_id == youngest {
150 status = ReadPagesStatus::Stop;
151 break;
152 }
153 page.push(stack_event.clone())
154 }
155
156 if !page.is_empty() {
157 new_pages.push(page);
158 }
159
160 status
161 })
162 .await;
163
164 for stack_events in new_pages.iter().rev() {
165 for stack_event in stack_events.iter().rev() {
166 if self.allow_client_request_token(stack_event) {
167 action(stack_event);
168 if (self.stop_condition)(stack_event) {
169 return;
170 }
171 }
172 }
173 }
174
175 if !new_pages.is_empty() {
176 youngest = new_pages.first().unwrap().first().unwrap().event_id.clone();
177 }
178
179 tokio::time::sleep(std::time::Duration::new(1, 0)).await
180 }
181 }
182
183 async fn read_pages(
184 &self,
185 cloudformation: &aws_sdk_cloudformation::client::Client,
186 mut process_page: impl FnMut(Vec<StackEvent>) -> ReadPagesStatus,
187 ) {
188 let mut next_token = None;
189
190 loop {
191 let output = cloudformation
192 .describe_stack_events()
193 .stack_name(&self.stack_id)
194 .set_next_token(next_token)
195 .send()
196 .await
197 .unwrap();
198
199 match process_page(output.stack_events.unwrap()) {
200 ReadPagesStatus::Continue => (),
201 ReadPagesStatus::Stop => break,
202 }
203
204 match output.next_token {
205 None => break,
206 Some(output_next_token) => next_token = Some(output_next_token),
207 }
208 }
209 }
210
211 fn allow_client_request_token(&self, stack_event: &StackEvent) -> bool {
212 match (
213 &self.client_request_token,
214 &stack_event.client_request_token,
215 ) {
216 (Some(ClientRequestToken(expected)), Some(provided)) => expected == provided,
217 _other => true,
218 }
219 }
220}
221
222pub(crate) fn print_event(stack_event: &StackEvent) {
223 log::log!(
224 log_level(stack_event),
225 "{} {} {} {} {}",
226 stack_event
227 .timestamp
228 .map(|value| value
229 .fmt(aws_sdk_cloudformation::primitives::DateTimeFormat::DateTime)
230 .unwrap())
231 .unwrap_or_else(|| "[event-time-missing]".to_string()),
232 stack_event
233 .physical_resource_id
234 .as_deref()
235 .unwrap_or("[unknown-physical-resource-id]"),
236 stack_event
237 .logical_resource_id
238 .as_deref()
239 .unwrap_or("[unknown-logical-resource-id]"),
240 stack_event
241 .resource_type
242 .as_deref()
243 .unwrap_or("[unknown-resource-type]"),
244 stack_event
245 .resource_status
246 .as_ref()
247 .map(|value| value.as_str())
248 .unwrap_or("[unknown-resource-status]"),
249 );
250
251 if let Some(ref message) = stack_event.resource_status_reason {
252 log::info!("- {message}")
253 }
254}
255
256fn log_level(stack_event: &StackEvent) -> log::Level {
257 match &stack_event.resource_status {
258 Some(resource_status) => match resource_status {
259 ResourceStatus::CreateComplete => log::Level::Info,
260 ResourceStatus::CreateFailed => log::Level::Error,
261 ResourceStatus::CreateInProgress => log::Level::Info,
262 ResourceStatus::DeleteComplete => log::Level::Info,
263 ResourceStatus::DeleteFailed => log::Level::Error,
264 ResourceStatus::DeleteInProgress => log::Level::Info,
265 ResourceStatus::DeleteSkipped => log::Level::Warn,
266 ResourceStatus::ExportComplete => log::Level::Info,
267 ResourceStatus::ExportFailed => log::Level::Error,
268 ResourceStatus::ExportRollbackComplete => log::Level::Warn,
269 ResourceStatus::ExportRollbackFailed => log::Level::Error,
270 ResourceStatus::ExportRollbackInProgress => log::Level::Warn,
271 ResourceStatus::ImportComplete => log::Level::Info,
272 ResourceStatus::ImportFailed => log::Level::Error,
273 ResourceStatus::ImportRollbackComplete => log::Level::Warn,
274 ResourceStatus::ImportRollbackFailed => log::Level::Error,
275 ResourceStatus::ImportRollbackInProgress => log::Level::Warn,
276 ResourceStatus::RollbackComplete => log::Level::Warn,
277 ResourceStatus::RollbackFailed => log::Level::Error,
278 ResourceStatus::RollbackInProgress => log::Level::Warn,
279 ResourceStatus::UpdateComplete => log::Level::Info,
280 ResourceStatus::UpdateFailed => log::Level::Error,
281 ResourceStatus::UpdateInProgress => log::Level::Info,
282 ResourceStatus::UpdateRollbackComplete => log::Level::Warn,
283 ResourceStatus::UpdateRollbackFailed => log::Level::Error,
284 ResourceStatus::UpdateRollbackInProgress => log::Level::Warn,
285 other => {
286 if other.as_str() == "UPDATE_COMPLETE_CLEANUP_IN_PROGRESS" {
287 log::Level::Info
288 } else {
289 log::Level::Warn
290 }
291 }
292 },
293 None => log::Level::Info,
294 }
295}