1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde_json::Value;
6use winterbaume_core::{
7 BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, default_account_id,
8 json_error_response,
9};
10
11use crate::cfn_schema::ShapeContext;
12use crate::state::{CloudControlError, CloudControlState};
13use crate::types::ResourceRequest;
14use crate::views::CloudControlStateView;
15use crate::wire;
16
17pub struct CloudControlService {
18 pub(crate) state: Arc<BackendState<CloudControlState>>,
19 pub(crate) notifier: StateChangeNotifier<CloudControlStateView>,
20}
21
22impl CloudControlService {
23 pub fn new() -> Self {
24 Self {
25 state: Arc::new(BackendState::new()),
26 notifier: StateChangeNotifier::new(),
27 }
28 }
29}
30
31impl Default for CloudControlService {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl MockService for CloudControlService {
38 fn service_name(&self) -> &str {
39 "cloudcontrolapi"
40 }
41
42 fn url_patterns(&self) -> Vec<&str> {
43 vec![
44 r"https?://cloudcontrolapi\..*\.amazonaws\.com",
45 r"https?://cloudcontrolapi\.amazonaws\.com",
46 ]
47 }
48
49 fn handle(
50 &self,
51 request: MockRequest,
52 ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
53 Box::pin(async move { self.dispatch(request).await })
54 }
55}
56
57fn service_error_response(err: &CloudControlError) -> MockResponse {
60 let (status, error_type) = match err {
61 CloudControlError::AlreadyExists { .. } => (400, "AlreadyExistsException"),
62 CloudControlError::ResourceNotFound { .. } => (404, "ResourceNotFoundException"),
63 CloudControlError::RequestTokenNotFound { .. } => (404, "RequestTokenNotFoundException"),
64 CloudControlError::TypeNotFound { .. } => (404, "TypeNotFoundException"),
65 CloudControlError::InvalidRequest { .. } => (400, "InvalidRequestException"),
66 CloudControlError::NotCancellable { .. } => (400, "ConcurrentModificationException"),
67 };
68 json_error_response(status, error_type, &err.to_string())
69}
70
71const MUTATING_ACTIONS: &[&str] = &[
73 "CreateResource",
74 "DeleteResource",
75 "UpdateResource",
76 "CancelResourceRequest",
77];
78
79impl CloudControlService {
80 async fn dispatch(&self, request: MockRequest) -> MockResponse {
81 let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
82 let account_id = default_account_id();
83
84 let action = request
85 .headers
86 .get("x-amz-target")
87 .and_then(|v| v.to_str().ok())
88 .and_then(|v| v.split('.').next_back())
89 .map(|s| s.to_string());
90
91 let action = match action {
92 Some(a) => a,
93 None => {
94 return json_error_response(400, "MissingAction", "Missing X-Amz-Target header");
95 }
96 };
97
98 if serde_json::from_slice::<Value>(&request.body).is_err() {
99 return json_error_response(400, "SerializationException", "Invalid JSON body");
100 }
101 let body_bytes: &[u8] = &request.body;
102
103 let state = self.state.get(account_id, ®ion);
104
105 let ctx = ShapeContext {
106 region: ®ion,
107 account_id,
108 };
109
110 let response = match action.as_str() {
111 "CreateResource" => self.handle_create_resource(&state, body_bytes, &ctx).await,
112 "DeleteResource" => self.handle_delete_resource(&state, body_bytes).await,
113 "UpdateResource" => self.handle_update_resource(&state, body_bytes, &ctx).await,
114 "GetResource" => self.handle_get_resource(&state, body_bytes).await,
115 "ListResources" => self.handle_list_resources(&state, body_bytes).await,
116 "GetResourceRequestStatus" => {
117 self.handle_get_resource_request_status(&state, body_bytes)
118 .await
119 }
120 "ListResourceRequests" => self.handle_list_resource_requests(&state, body_bytes).await,
121 "CancelResourceRequest" => {
122 self.handle_cancel_resource_request(&state, body_bytes)
123 .await
124 }
125 _ => json_error_response(
126 501,
127 "NotImplementedError",
128 &format!(
129 "{} is not yet implemented in winterbaume-cloudcontrol",
130 action
131 ),
132 ),
133 };
134
135 if MUTATING_ACTIONS.contains(&action.as_str()) && response.status / 100 == 2 {
137 use winterbaume_core::StatefulService;
138 self.notify_state_changed(account_id, ®ion).await;
139 }
140
141 response
142 }
143
144 async fn handle_create_resource(
145 &self,
146 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
147 body: &[u8],
148 ctx: &ShapeContext<'_>,
149 ) -> MockResponse {
150 let input = match wire::deserialize_create_resource_request(body) {
151 Ok(v) => v,
152 Err(e) => return json_error_response(400, "ValidationException", &e),
153 };
154 if input.type_name.is_empty() {
155 return json_error_response(
156 400,
157 "ValidationException",
158 "Missing required field 'TypeName'",
159 );
160 }
161 if input.desired_state.is_empty() {
162 return json_error_response(
163 400,
164 "ValidationException",
165 "Missing required field 'DesiredState'",
166 );
167 }
168 let type_name = input.type_name.as_str();
169 let desired_state = input.desired_state.as_str();
170
171 let mut guard = state.write().await;
172 match guard.create_resource(type_name, desired_state, ctx) {
173 Ok(request) => create_response(&request),
174 Err(e) => service_error_response(&e),
175 }
176 }
177
178 async fn handle_delete_resource(
179 &self,
180 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
181 body: &[u8],
182 ) -> MockResponse {
183 let input = match wire::deserialize_delete_resource_request(body) {
184 Ok(v) => v,
185 Err(e) => return json_error_response(400, "ValidationException", &e),
186 };
187 if input.type_name.is_empty() {
188 return json_error_response(
189 400,
190 "ValidationException",
191 "Missing required field 'TypeName'",
192 );
193 }
194 if input.identifier.is_empty() {
195 return json_error_response(
196 400,
197 "ValidationException",
198 "Missing required field 'Identifier'",
199 );
200 }
201 let type_name = input.type_name.as_str();
202 let identifier = input.identifier.as_str();
203
204 let mut guard = state.write().await;
205 match guard.delete_resource(type_name, identifier) {
206 Ok(request) => delete_response(&request),
207 Err(e) => service_error_response(&e),
208 }
209 }
210
211 async fn handle_update_resource(
212 &self,
213 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
214 body: &[u8],
215 ctx: &ShapeContext<'_>,
216 ) -> MockResponse {
217 let input = match wire::deserialize_update_resource_request(body) {
218 Ok(v) => v,
219 Err(e) => return json_error_response(400, "ValidationException", &e),
220 };
221 if input.type_name.is_empty() {
222 return json_error_response(
223 400,
224 "ValidationException",
225 "Missing required field 'TypeName'",
226 );
227 }
228 if input.identifier.is_empty() {
229 return json_error_response(
230 400,
231 "ValidationException",
232 "Missing required field 'Identifier'",
233 );
234 }
235 if input.patch_document.is_empty() {
236 return json_error_response(
237 400,
238 "ValidationException",
239 "Missing required field 'PatchDocument'",
240 );
241 }
242 let type_name = input.type_name.as_str();
243 let identifier = input.identifier.as_str();
244 let patch_document = input.patch_document.as_str();
245
246 let mut guard = state.write().await;
247 match guard.update_resource(type_name, identifier, patch_document, ctx) {
248 Ok(request) => update_response(&request),
249 Err(e) => service_error_response(&e),
250 }
251 }
252
253 async fn handle_get_resource(
254 &self,
255 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
256 body: &[u8],
257 ) -> MockResponse {
258 let input = match wire::deserialize_get_resource_request(body) {
259 Ok(v) => v,
260 Err(e) => return json_error_response(400, "ValidationException", &e),
261 };
262 if input.type_name.is_empty() {
263 return json_error_response(
264 400,
265 "ValidationException",
266 "Missing required field 'TypeName'",
267 );
268 }
269 if input.identifier.is_empty() {
270 return json_error_response(
271 400,
272 "ValidationException",
273 "Missing required field 'Identifier'",
274 );
275 }
276 let type_name = input.type_name.as_str();
277 let identifier = input.identifier.as_str();
278
279 let guard = state.read().await;
280 match guard.get_resource(type_name, identifier) {
281 Ok(resource) => wire::serialize_get_resource_response(&wire::GetResourceOutput {
282 type_name: Some(resource.type_name.clone()),
283 resource_description: Some(wire::ResourceDescription {
284 identifier: Some(resource.identifier.clone()),
285 properties: Some(resource.resource_model.clone()),
286 }),
287 }),
288 Err(e) => service_error_response(&e),
289 }
290 }
291
292 async fn handle_list_resources(
293 &self,
294 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
295 body: &[u8],
296 ) -> MockResponse {
297 let input = match wire::deserialize_list_resources_request(body) {
298 Ok(v) => v,
299 Err(e) => return json_error_response(400, "ValidationException", &e),
300 };
301 if input.type_name.is_empty() {
302 return json_error_response(
303 400,
304 "ValidationException",
305 "Missing required field 'TypeName'",
306 );
307 }
308 let type_name = input.type_name.as_str();
309
310 let guard = state.read().await;
311 let resources = guard.list_resources(type_name);
312 let descriptions: Vec<wire::ResourceDescription> = resources
313 .iter()
314 .map(|r| wire::ResourceDescription {
315 identifier: Some(r.identifier.clone()),
316 properties: Some(r.resource_model.clone()),
317 })
318 .collect();
319
320 wire::serialize_list_resources_response(&wire::ListResourcesOutput {
321 type_name: Some(type_name.to_string()),
322 resource_descriptions: Some(descriptions),
323 next_token: None,
324 })
325 }
326
327 async fn handle_get_resource_request_status(
328 &self,
329 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
330 body: &[u8],
331 ) -> MockResponse {
332 let input = match wire::deserialize_get_resource_request_status_request(body) {
333 Ok(v) => v,
334 Err(e) => return json_error_response(400, "ValidationException", &e),
335 };
336 if input.request_token.is_empty() {
337 return json_error_response(
338 400,
339 "ValidationException",
340 "Missing required field 'RequestToken'",
341 );
342 }
343 let request_token = input.request_token.as_str();
344
345 let guard = state.read().await;
346 match guard.get_resource_request_status(request_token) {
347 Ok(request) => wire::serialize_get_resource_request_status_response(
348 &wire::GetResourceRequestStatusOutput {
349 progress_event: Some(build_progress_event(request)),
350 hooks_progress_event: None,
351 },
352 ),
353 Err(e) => service_error_response(&e),
354 }
355 }
356
357 async fn handle_list_resource_requests(
358 &self,
359 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
360 body: &[u8],
361 ) -> MockResponse {
362 let input = match wire::deserialize_list_resource_requests_request(body) {
363 Ok(v) => v,
364 Err(e) => return json_error_response(400, "ValidationException", &e),
365 };
366 let filter = input.resource_request_status_filter;
367 let operation_filter: Option<Vec<String>> =
368 filter.as_ref().and_then(|f| f.operations.clone());
369 let status_filter: Option<Vec<String>> =
370 filter.as_ref().and_then(|f| f.operation_statuses.clone());
371
372 let op_refs: Option<Vec<&str>> = operation_filter
373 .as_ref()
374 .map(|v| v.iter().map(|s| s.as_str()).collect());
375 let status_refs: Option<Vec<&str>> = status_filter
376 .as_ref()
377 .map(|v| v.iter().map(|s| s.as_str()).collect());
378
379 let guard = state.read().await;
380 let requests = guard.list_resource_requests(op_refs.as_deref(), status_refs.as_deref());
381 let summaries: Vec<wire::ProgressEvent> =
382 requests.iter().map(|r| build_progress_event(r)).collect();
383
384 wire::serialize_list_resource_requests_response(&wire::ListResourceRequestsOutput {
385 resource_request_status_summaries: Some(summaries),
386 next_token: None,
387 })
388 }
389
390 async fn handle_cancel_resource_request(
391 &self,
392 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
393 body: &[u8],
394 ) -> MockResponse {
395 let input = match wire::deserialize_cancel_resource_request_request(body) {
396 Ok(v) => v,
397 Err(e) => return json_error_response(400, "ValidationException", &e),
398 };
399 if input.request_token.is_empty() {
400 return json_error_response(
401 400,
402 "ValidationException",
403 "Missing required field 'RequestToken'",
404 );
405 }
406 let request_token = input.request_token.as_str();
407
408 let mut guard = state.write().await;
409 match guard.cancel_resource_request(request_token) {
410 Ok(request) => cancel_response(&request),
411 Err(e) => service_error_response(&e),
412 }
413 }
414}
415
416fn build_progress_event(request: &ResourceRequest) -> wire::ProgressEvent {
418 wire::ProgressEvent {
419 type_name: Some(request.type_name.clone()),
420 identifier: Some(request.identifier.clone()),
421 request_token: Some(request.request_token.clone()),
422 hooks_request_token: None,
423 operation: Some(request.operation.as_str().to_string()),
424 operation_status: Some(request.operation_status.as_str().to_string()),
425 event_time: Some(request.event_time.timestamp() as f64),
426 resource_model: request.resource_model.clone(),
427 status_message: request.status_message.clone(),
428 error_code: request.error_code.clone(),
429 retry_after: None,
430 }
431}
432
433fn create_response(request: &ResourceRequest) -> MockResponse {
435 wire::serialize_create_resource_response(&wire::CreateResourceOutput {
436 progress_event: Some(build_progress_event(request)),
437 })
438}
439
440fn delete_response(request: &ResourceRequest) -> MockResponse {
442 wire::serialize_delete_resource_response(&wire::DeleteResourceOutput {
443 progress_event: Some(build_progress_event(request)),
444 })
445}
446
447fn update_response(request: &ResourceRequest) -> MockResponse {
449 wire::serialize_update_resource_response(&wire::UpdateResourceOutput {
450 progress_event: Some(build_progress_event(request)),
451 })
452}
453
454fn cancel_response(request: &ResourceRequest) -> MockResponse {
456 wire::serialize_cancel_resource_request_response(&wire::CancelResourceRequestOutput {
457 progress_event: Some(build_progress_event(request)),
458 })
459}