1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde_json::Value;
6use winterbaume_core::{
7 BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, default_account_id,
8 json_error_response,
9};
10
11use crate::state::{CloudControlError, CloudControlState};
12use crate::types::ResourceRequest;
13use crate::views::CloudControlStateView;
14use crate::wire;
15
16pub struct CloudControlService {
17 pub(crate) state: Arc<BackendState<CloudControlState>>,
18 pub(crate) notifier: StateChangeNotifier<CloudControlStateView>,
19}
20
21impl CloudControlService {
22 pub fn new() -> Self {
23 Self {
24 state: Arc::new(BackendState::new()),
25 notifier: StateChangeNotifier::new(),
26 }
27 }
28}
29
30impl Default for CloudControlService {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl MockService for CloudControlService {
37 fn service_name(&self) -> &str {
38 "cloudcontrolapi"
39 }
40
41 fn url_patterns(&self) -> Vec<&str> {
42 vec![
43 r"https?://cloudcontrolapi\..*\.amazonaws\.com",
44 r"https?://cloudcontrolapi\.amazonaws\.com",
45 ]
46 }
47
48 fn handle(
49 &self,
50 request: MockRequest,
51 ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
52 Box::pin(async move { self.dispatch(request).await })
53 }
54}
55
56fn service_error_response(err: &CloudControlError) -> MockResponse {
59 let (status, error_type) = match err {
60 CloudControlError::AlreadyExists { .. } => (400, "AlreadyExistsException"),
61 CloudControlError::ResourceNotFound { .. } => (404, "ResourceNotFoundException"),
62 CloudControlError::RequestTokenNotFound { .. } => (404, "RequestTokenNotFoundException"),
63 CloudControlError::TypeNotFound { .. } => (404, "TypeNotFoundException"),
64 CloudControlError::InvalidRequest { .. } => (400, "InvalidRequestException"),
65 CloudControlError::NotCancellable { .. } => (400, "ConcurrentModificationException"),
66 };
67 json_error_response(status, error_type, &err.to_string())
68}
69
70const MUTATING_ACTIONS: &[&str] = &[
72 "CreateResource",
73 "DeleteResource",
74 "UpdateResource",
75 "CancelResourceRequest",
76];
77
78impl CloudControlService {
79 async fn dispatch(&self, request: MockRequest) -> MockResponse {
80 let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
81 let account_id = default_account_id();
82
83 let action = request
84 .headers
85 .get("x-amz-target")
86 .and_then(|v| v.to_str().ok())
87 .and_then(|v| v.split('.').next_back())
88 .map(|s| s.to_string());
89
90 let action = match action {
91 Some(a) => a,
92 None => {
93 return json_error_response(400, "MissingAction", "Missing X-Amz-Target header");
94 }
95 };
96
97 if serde_json::from_slice::<Value>(&request.body).is_err() {
98 return json_error_response(400, "SerializationException", "Invalid JSON body");
99 }
100 let body_bytes: &[u8] = &request.body;
101
102 let state = self.state.get(account_id, ®ion);
103
104 let response = match action.as_str() {
105 "CreateResource" => self.handle_create_resource(&state, body_bytes).await,
106 "DeleteResource" => self.handle_delete_resource(&state, body_bytes).await,
107 "UpdateResource" => self.handle_update_resource(&state, body_bytes).await,
108 "GetResource" => self.handle_get_resource(&state, body_bytes).await,
109 "ListResources" => self.handle_list_resources(&state, body_bytes).await,
110 "GetResourceRequestStatus" => {
111 self.handle_get_resource_request_status(&state, body_bytes)
112 .await
113 }
114 "ListResourceRequests" => self.handle_list_resource_requests(&state, body_bytes).await,
115 "CancelResourceRequest" => {
116 self.handle_cancel_resource_request(&state, body_bytes)
117 .await
118 }
119 _ => json_error_response(
120 501,
121 "NotImplementedError",
122 &format!(
123 "{} is not yet implemented in winterbaume-cloudcontrol",
124 action
125 ),
126 ),
127 };
128
129 if MUTATING_ACTIONS.contains(&action.as_str()) && response.status / 100 == 2 {
131 use winterbaume_core::StatefulService;
132 self.notify_state_changed(account_id, ®ion).await;
133 }
134
135 response
136 }
137
138 async fn handle_create_resource(
139 &self,
140 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
141 body: &[u8],
142 ) -> MockResponse {
143 let input = match wire::deserialize_create_resource_request(body) {
144 Ok(v) => v,
145 Err(e) => return json_error_response(400, "ValidationException", &e),
146 };
147 if input.type_name.is_empty() {
148 return json_error_response(
149 400,
150 "ValidationException",
151 "Missing required field 'TypeName'",
152 );
153 }
154 if input.desired_state.is_empty() {
155 return json_error_response(
156 400,
157 "ValidationException",
158 "Missing required field 'DesiredState'",
159 );
160 }
161 let type_name = input.type_name.as_str();
162 let desired_state = input.desired_state.as_str();
163
164 let mut guard = state.write().await;
165 match guard.create_resource(type_name, desired_state) {
166 Ok(request) => create_response(&request),
167 Err(e) => service_error_response(&e),
168 }
169 }
170
171 async fn handle_delete_resource(
172 &self,
173 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
174 body: &[u8],
175 ) -> MockResponse {
176 let input = match wire::deserialize_delete_resource_request(body) {
177 Ok(v) => v,
178 Err(e) => return json_error_response(400, "ValidationException", &e),
179 };
180 if input.type_name.is_empty() {
181 return json_error_response(
182 400,
183 "ValidationException",
184 "Missing required field 'TypeName'",
185 );
186 }
187 if input.identifier.is_empty() {
188 return json_error_response(
189 400,
190 "ValidationException",
191 "Missing required field 'Identifier'",
192 );
193 }
194 let type_name = input.type_name.as_str();
195 let identifier = input.identifier.as_str();
196
197 let mut guard = state.write().await;
198 match guard.delete_resource(type_name, identifier) {
199 Ok(request) => delete_response(&request),
200 Err(e) => service_error_response(&e),
201 }
202 }
203
204 async fn handle_update_resource(
205 &self,
206 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
207 body: &[u8],
208 ) -> MockResponse {
209 let input = match wire::deserialize_update_resource_request(body) {
210 Ok(v) => v,
211 Err(e) => return json_error_response(400, "ValidationException", &e),
212 };
213 if input.type_name.is_empty() {
214 return json_error_response(
215 400,
216 "ValidationException",
217 "Missing required field 'TypeName'",
218 );
219 }
220 if input.identifier.is_empty() {
221 return json_error_response(
222 400,
223 "ValidationException",
224 "Missing required field 'Identifier'",
225 );
226 }
227 if input.patch_document.is_empty() {
228 return json_error_response(
229 400,
230 "ValidationException",
231 "Missing required field 'PatchDocument'",
232 );
233 }
234 let type_name = input.type_name.as_str();
235 let identifier = input.identifier.as_str();
236 let patch_document = input.patch_document.as_str();
237
238 let mut guard = state.write().await;
239 match guard.update_resource(type_name, identifier, patch_document) {
240 Ok(request) => update_response(&request),
241 Err(e) => service_error_response(&e),
242 }
243 }
244
245 async fn handle_get_resource(
246 &self,
247 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
248 body: &[u8],
249 ) -> MockResponse {
250 let input = match wire::deserialize_get_resource_request(body) {
251 Ok(v) => v,
252 Err(e) => return json_error_response(400, "ValidationException", &e),
253 };
254 if input.type_name.is_empty() {
255 return json_error_response(
256 400,
257 "ValidationException",
258 "Missing required field 'TypeName'",
259 );
260 }
261 if input.identifier.is_empty() {
262 return json_error_response(
263 400,
264 "ValidationException",
265 "Missing required field 'Identifier'",
266 );
267 }
268 let type_name = input.type_name.as_str();
269 let identifier = input.identifier.as_str();
270
271 let guard = state.read().await;
272 match guard.get_resource(type_name, identifier) {
273 Ok(resource) => wire::serialize_get_resource_response(&wire::GetResourceOutput {
274 type_name: Some(resource.type_name.clone()),
275 resource_description: Some(wire::ResourceDescription {
276 identifier: Some(resource.identifier.clone()),
277 properties: Some(resource.resource_model.clone()),
278 }),
279 }),
280 Err(e) => service_error_response(&e),
281 }
282 }
283
284 async fn handle_list_resources(
285 &self,
286 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
287 body: &[u8],
288 ) -> MockResponse {
289 let input = match wire::deserialize_list_resources_request(body) {
290 Ok(v) => v,
291 Err(e) => return json_error_response(400, "ValidationException", &e),
292 };
293 if input.type_name.is_empty() {
294 return json_error_response(
295 400,
296 "ValidationException",
297 "Missing required field 'TypeName'",
298 );
299 }
300 let type_name = input.type_name.as_str();
301
302 let guard = state.read().await;
303 let resources = guard.list_resources(type_name);
304 let descriptions: Vec<wire::ResourceDescription> = resources
305 .iter()
306 .map(|r| wire::ResourceDescription {
307 identifier: Some(r.identifier.clone()),
308 properties: Some(r.resource_model.clone()),
309 })
310 .collect();
311
312 wire::serialize_list_resources_response(&wire::ListResourcesOutput {
313 type_name: Some(type_name.to_string()),
314 resource_descriptions: Some(descriptions),
315 next_token: None,
316 })
317 }
318
319 async fn handle_get_resource_request_status(
320 &self,
321 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
322 body: &[u8],
323 ) -> MockResponse {
324 let input = match wire::deserialize_get_resource_request_status_request(body) {
325 Ok(v) => v,
326 Err(e) => return json_error_response(400, "ValidationException", &e),
327 };
328 if input.request_token.is_empty() {
329 return json_error_response(
330 400,
331 "ValidationException",
332 "Missing required field 'RequestToken'",
333 );
334 }
335 let request_token = input.request_token.as_str();
336
337 let guard = state.read().await;
338 match guard.get_resource_request_status(request_token) {
339 Ok(request) => wire::serialize_get_resource_request_status_response(
340 &wire::GetResourceRequestStatusOutput {
341 progress_event: Some(build_progress_event(request)),
342 hooks_progress_event: None,
343 },
344 ),
345 Err(e) => service_error_response(&e),
346 }
347 }
348
349 async fn handle_list_resource_requests(
350 &self,
351 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
352 body: &[u8],
353 ) -> MockResponse {
354 let input = match wire::deserialize_list_resource_requests_request(body) {
355 Ok(v) => v,
356 Err(e) => return json_error_response(400, "ValidationException", &e),
357 };
358 let filter = input.resource_request_status_filter;
359 let operation_filter: Option<Vec<String>> =
360 filter.as_ref().and_then(|f| f.operations.clone());
361 let status_filter: Option<Vec<String>> =
362 filter.as_ref().and_then(|f| f.operation_statuses.clone());
363
364 let op_refs: Option<Vec<&str>> = operation_filter
365 .as_ref()
366 .map(|v| v.iter().map(|s| s.as_str()).collect());
367 let status_refs: Option<Vec<&str>> = status_filter
368 .as_ref()
369 .map(|v| v.iter().map(|s| s.as_str()).collect());
370
371 let guard = state.read().await;
372 let requests = guard.list_resource_requests(op_refs.as_deref(), status_refs.as_deref());
373 let summaries: Vec<wire::ProgressEvent> =
374 requests.iter().map(|r| build_progress_event(r)).collect();
375
376 wire::serialize_list_resource_requests_response(&wire::ListResourceRequestsOutput {
377 resource_request_status_summaries: Some(summaries),
378 next_token: None,
379 })
380 }
381
382 async fn handle_cancel_resource_request(
383 &self,
384 state: &Arc<tokio::sync::RwLock<CloudControlState>>,
385 body: &[u8],
386 ) -> MockResponse {
387 let input = match wire::deserialize_cancel_resource_request_request(body) {
388 Ok(v) => v,
389 Err(e) => return json_error_response(400, "ValidationException", &e),
390 };
391 if input.request_token.is_empty() {
392 return json_error_response(
393 400,
394 "ValidationException",
395 "Missing required field 'RequestToken'",
396 );
397 }
398 let request_token = input.request_token.as_str();
399
400 let mut guard = state.write().await;
401 match guard.cancel_resource_request(request_token) {
402 Ok(request) => cancel_response(&request),
403 Err(e) => service_error_response(&e),
404 }
405 }
406}
407
408fn build_progress_event(request: &ResourceRequest) -> wire::ProgressEvent {
410 wire::ProgressEvent {
411 type_name: Some(request.type_name.clone()),
412 identifier: Some(request.identifier.clone()),
413 request_token: Some(request.request_token.clone()),
414 hooks_request_token: None,
415 operation: Some(request.operation.as_str().to_string()),
416 operation_status: Some(request.operation_status.as_str().to_string()),
417 event_time: Some(request.event_time.timestamp() as f64),
418 resource_model: request.resource_model.clone(),
419 status_message: request.status_message.clone(),
420 error_code: request.error_code.clone(),
421 retry_after: None,
422 }
423}
424
425fn create_response(request: &ResourceRequest) -> MockResponse {
427 wire::serialize_create_resource_response(&wire::CreateResourceOutput {
428 progress_event: Some(build_progress_event(request)),
429 })
430}
431
432fn delete_response(request: &ResourceRequest) -> MockResponse {
434 wire::serialize_delete_resource_response(&wire::DeleteResourceOutput {
435 progress_event: Some(build_progress_event(request)),
436 })
437}
438
439fn update_response(request: &ResourceRequest) -> MockResponse {
441 wire::serialize_update_resource_response(&wire::UpdateResourceOutput {
442 progress_event: Some(build_progress_event(request)),
443 })
444}
445
446fn cancel_response(request: &ResourceRequest) -> MockResponse {
448 wire::serialize_cancel_resource_request_response(&wire::CancelResourceRequestOutput {
449 progress_event: Some(build_progress_event(request)),
450 })
451}