Skip to main content

winterbaume_cloudcontrol/
handlers.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde_json::Value;
6use winterbaume_core::{
7    BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, default_account_id,
8    json_error_response,
9};
10
11use crate::cfn_schema::ShapeContext;
12use crate::state::{CloudControlError, CloudControlState};
13use crate::types::ResourceRequest;
14use crate::views::CloudControlStateView;
15use crate::wire;
16
17pub struct CloudControlService {
18    pub(crate) state: Arc<BackendState<CloudControlState>>,
19    pub(crate) notifier: StateChangeNotifier<CloudControlStateView>,
20}
21
22impl CloudControlService {
23    pub fn new() -> Self {
24        Self {
25            state: Arc::new(BackendState::new()),
26            notifier: StateChangeNotifier::new(),
27        }
28    }
29}
30
31impl Default for CloudControlService {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl MockService for CloudControlService {
38    fn service_name(&self) -> &str {
39        "cloudcontrolapi"
40    }
41
42    fn url_patterns(&self) -> Vec<&str> {
43        vec![
44            r"https?://cloudcontrolapi\..*\.amazonaws\.com",
45            r"https?://cloudcontrolapi\.amazonaws\.com",
46        ]
47    }
48
49    fn handle(
50        &self,
51        request: MockRequest,
52    ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
53        Box::pin(async move { self.dispatch(request).await })
54    }
55}
56
57/// Map domain errors to AWS-facing error responses.
58/// Exhaustive match -- no wildcard arm.
59fn service_error_response(err: &CloudControlError) -> MockResponse {
60    let (status, error_type) = match err {
61        CloudControlError::AlreadyExists { .. } => (400, "AlreadyExistsException"),
62        CloudControlError::ResourceNotFound { .. } => (404, "ResourceNotFoundException"),
63        CloudControlError::RequestTokenNotFound { .. } => (404, "RequestTokenNotFoundException"),
64        CloudControlError::TypeNotFound { .. } => (404, "TypeNotFoundException"),
65        CloudControlError::InvalidRequest { .. } => (400, "InvalidRequestException"),
66        CloudControlError::NotCancellable { .. } => (400, "ConcurrentModificationException"),
67    };
68    json_error_response(status, error_type, &err.to_string())
69}
70
71/// Mutating action names for state change notification.
72const MUTATING_ACTIONS: &[&str] = &[
73    "CreateResource",
74    "DeleteResource",
75    "UpdateResource",
76    "CancelResourceRequest",
77];
78
79impl CloudControlService {
80    async fn dispatch(&self, request: MockRequest) -> MockResponse {
81        let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
82        let account_id = default_account_id();
83
84        let action = request
85            .headers
86            .get("x-amz-target")
87            .and_then(|v| v.to_str().ok())
88            .and_then(|v| v.split('.').next_back())
89            .map(|s| s.to_string());
90
91        let action = match action {
92            Some(a) => a,
93            None => {
94                return json_error_response(400, "MissingAction", "Missing X-Amz-Target header");
95            }
96        };
97
98        if serde_json::from_slice::<Value>(&request.body).is_err() {
99            return json_error_response(400, "SerializationException", "Invalid JSON body");
100        }
101        let body_bytes: &[u8] = &request.body;
102
103        let state = self.state.get(account_id, &region);
104
105        let ctx = ShapeContext {
106            region: &region,
107            account_id,
108        };
109
110        let response = match action.as_str() {
111            "CreateResource" => self.handle_create_resource(&state, body_bytes, &ctx).await,
112            "DeleteResource" => self.handle_delete_resource(&state, body_bytes).await,
113            "UpdateResource" => self.handle_update_resource(&state, body_bytes, &ctx).await,
114            "GetResource" => self.handle_get_resource(&state, body_bytes).await,
115            "ListResources" => self.handle_list_resources(&state, body_bytes).await,
116            "GetResourceRequestStatus" => {
117                self.handle_get_resource_request_status(&state, body_bytes)
118                    .await
119            }
120            "ListResourceRequests" => self.handle_list_resource_requests(&state, body_bytes).await,
121            "CancelResourceRequest" => {
122                self.handle_cancel_resource_request(&state, body_bytes)
123                    .await
124            }
125            _ => json_error_response(
126                501,
127                "NotImplementedError",
128                &format!(
129                    "{} is not yet implemented in winterbaume-cloudcontrol",
130                    action
131                ),
132            ),
133        };
134
135        // Notify state change for mutating actions
136        if MUTATING_ACTIONS.contains(&action.as_str()) && response.status / 100 == 2 {
137            use winterbaume_core::StatefulService;
138            self.notify_state_changed(account_id, &region).await;
139        }
140
141        response
142    }
143
144    async fn handle_create_resource(
145        &self,
146        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
147        body: &[u8],
148        ctx: &ShapeContext<'_>,
149    ) -> MockResponse {
150        let input = match wire::deserialize_create_resource_request(body) {
151            Ok(v) => v,
152            Err(e) => return json_error_response(400, "ValidationException", &e),
153        };
154        if input.type_name.is_empty() {
155            return json_error_response(
156                400,
157                "ValidationException",
158                "Missing required field 'TypeName'",
159            );
160        }
161        if input.desired_state.is_empty() {
162            return json_error_response(
163                400,
164                "ValidationException",
165                "Missing required field 'DesiredState'",
166            );
167        }
168        let type_name = input.type_name.as_str();
169        let desired_state = input.desired_state.as_str();
170
171        let mut guard = state.write().await;
172        match guard.create_resource(type_name, desired_state, ctx) {
173            Ok(request) => create_response(&request),
174            Err(e) => service_error_response(&e),
175        }
176    }
177
178    async fn handle_delete_resource(
179        &self,
180        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
181        body: &[u8],
182    ) -> MockResponse {
183        let input = match wire::deserialize_delete_resource_request(body) {
184            Ok(v) => v,
185            Err(e) => return json_error_response(400, "ValidationException", &e),
186        };
187        if input.type_name.is_empty() {
188            return json_error_response(
189                400,
190                "ValidationException",
191                "Missing required field 'TypeName'",
192            );
193        }
194        if input.identifier.is_empty() {
195            return json_error_response(
196                400,
197                "ValidationException",
198                "Missing required field 'Identifier'",
199            );
200        }
201        let type_name = input.type_name.as_str();
202        let identifier = input.identifier.as_str();
203
204        let mut guard = state.write().await;
205        match guard.delete_resource(type_name, identifier) {
206            Ok(request) => delete_response(&request),
207            Err(e) => service_error_response(&e),
208        }
209    }
210
211    async fn handle_update_resource(
212        &self,
213        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
214        body: &[u8],
215        ctx: &ShapeContext<'_>,
216    ) -> MockResponse {
217        let input = match wire::deserialize_update_resource_request(body) {
218            Ok(v) => v,
219            Err(e) => return json_error_response(400, "ValidationException", &e),
220        };
221        if input.type_name.is_empty() {
222            return json_error_response(
223                400,
224                "ValidationException",
225                "Missing required field 'TypeName'",
226            );
227        }
228        if input.identifier.is_empty() {
229            return json_error_response(
230                400,
231                "ValidationException",
232                "Missing required field 'Identifier'",
233            );
234        }
235        if input.patch_document.is_empty() {
236            return json_error_response(
237                400,
238                "ValidationException",
239                "Missing required field 'PatchDocument'",
240            );
241        }
242        let type_name = input.type_name.as_str();
243        let identifier = input.identifier.as_str();
244        let patch_document = input.patch_document.as_str();
245
246        let mut guard = state.write().await;
247        match guard.update_resource(type_name, identifier, patch_document, ctx) {
248            Ok(request) => update_response(&request),
249            Err(e) => service_error_response(&e),
250        }
251    }
252
253    async fn handle_get_resource(
254        &self,
255        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
256        body: &[u8],
257    ) -> MockResponse {
258        let input = match wire::deserialize_get_resource_request(body) {
259            Ok(v) => v,
260            Err(e) => return json_error_response(400, "ValidationException", &e),
261        };
262        if input.type_name.is_empty() {
263            return json_error_response(
264                400,
265                "ValidationException",
266                "Missing required field 'TypeName'",
267            );
268        }
269        if input.identifier.is_empty() {
270            return json_error_response(
271                400,
272                "ValidationException",
273                "Missing required field 'Identifier'",
274            );
275        }
276        let type_name = input.type_name.as_str();
277        let identifier = input.identifier.as_str();
278
279        let guard = state.read().await;
280        match guard.get_resource(type_name, identifier) {
281            Ok(resource) => wire::serialize_get_resource_response(&wire::GetResourceOutput {
282                type_name: Some(resource.type_name.clone()),
283                resource_description: Some(wire::ResourceDescription {
284                    identifier: Some(resource.identifier.clone()),
285                    properties: Some(resource.resource_model.clone()),
286                }),
287            }),
288            Err(e) => service_error_response(&e),
289        }
290    }
291
292    async fn handle_list_resources(
293        &self,
294        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
295        body: &[u8],
296    ) -> MockResponse {
297        let input = match wire::deserialize_list_resources_request(body) {
298            Ok(v) => v,
299            Err(e) => return json_error_response(400, "ValidationException", &e),
300        };
301        if input.type_name.is_empty() {
302            return json_error_response(
303                400,
304                "ValidationException",
305                "Missing required field 'TypeName'",
306            );
307        }
308        let type_name = input.type_name.as_str();
309
310        let guard = state.read().await;
311        let resources = guard.list_resources(type_name);
312        let descriptions: Vec<wire::ResourceDescription> = resources
313            .iter()
314            .map(|r| wire::ResourceDescription {
315                identifier: Some(r.identifier.clone()),
316                properties: Some(r.resource_model.clone()),
317            })
318            .collect();
319
320        wire::serialize_list_resources_response(&wire::ListResourcesOutput {
321            type_name: Some(type_name.to_string()),
322            resource_descriptions: Some(descriptions),
323            next_token: None,
324        })
325    }
326
327    async fn handle_get_resource_request_status(
328        &self,
329        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
330        body: &[u8],
331    ) -> MockResponse {
332        let input = match wire::deserialize_get_resource_request_status_request(body) {
333            Ok(v) => v,
334            Err(e) => return json_error_response(400, "ValidationException", &e),
335        };
336        if input.request_token.is_empty() {
337            return json_error_response(
338                400,
339                "ValidationException",
340                "Missing required field 'RequestToken'",
341            );
342        }
343        let request_token = input.request_token.as_str();
344
345        let guard = state.read().await;
346        match guard.get_resource_request_status(request_token) {
347            Ok(request) => wire::serialize_get_resource_request_status_response(
348                &wire::GetResourceRequestStatusOutput {
349                    progress_event: Some(build_progress_event(request)),
350                    hooks_progress_event: None,
351                },
352            ),
353            Err(e) => service_error_response(&e),
354        }
355    }
356
357    async fn handle_list_resource_requests(
358        &self,
359        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
360        body: &[u8],
361    ) -> MockResponse {
362        let input = match wire::deserialize_list_resource_requests_request(body) {
363            Ok(v) => v,
364            Err(e) => return json_error_response(400, "ValidationException", &e),
365        };
366        let filter = input.resource_request_status_filter;
367        let operation_filter: Option<Vec<String>> =
368            filter.as_ref().and_then(|f| f.operations.clone());
369        let status_filter: Option<Vec<String>> =
370            filter.as_ref().and_then(|f| f.operation_statuses.clone());
371
372        let op_refs: Option<Vec<&str>> = operation_filter
373            .as_ref()
374            .map(|v| v.iter().map(|s| s.as_str()).collect());
375        let status_refs: Option<Vec<&str>> = status_filter
376            .as_ref()
377            .map(|v| v.iter().map(|s| s.as_str()).collect());
378
379        let guard = state.read().await;
380        let requests = guard.list_resource_requests(op_refs.as_deref(), status_refs.as_deref());
381        let summaries: Vec<wire::ProgressEvent> =
382            requests.iter().map(|r| build_progress_event(r)).collect();
383
384        wire::serialize_list_resource_requests_response(&wire::ListResourceRequestsOutput {
385            resource_request_status_summaries: Some(summaries),
386            next_token: None,
387        })
388    }
389
390    async fn handle_cancel_resource_request(
391        &self,
392        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
393        body: &[u8],
394    ) -> MockResponse {
395        let input = match wire::deserialize_cancel_resource_request_request(body) {
396            Ok(v) => v,
397            Err(e) => return json_error_response(400, "ValidationException", &e),
398        };
399        if input.request_token.is_empty() {
400            return json_error_response(
401                400,
402                "ValidationException",
403                "Missing required field 'RequestToken'",
404            );
405        }
406        let request_token = input.request_token.as_str();
407
408        let mut guard = state.write().await;
409        match guard.cancel_resource_request(request_token) {
410            Ok(request) => cancel_response(&request),
411            Err(e) => service_error_response(&e),
412        }
413    }
414}
415
416/// Build a ProgressEvent wire type from a ResourceRequest.
417fn build_progress_event(request: &ResourceRequest) -> wire::ProgressEvent {
418    wire::ProgressEvent {
419        type_name: Some(request.type_name.clone()),
420        identifier: Some(request.identifier.clone()),
421        request_token: Some(request.request_token.clone()),
422        hooks_request_token: None,
423        operation: Some(request.operation.as_str().to_string()),
424        operation_status: Some(request.operation_status.as_str().to_string()),
425        event_time: Some(request.event_time.timestamp() as f64),
426        resource_model: request.resource_model.clone(),
427        status_message: request.status_message.clone(),
428        error_code: request.error_code.clone(),
429        retry_after: None,
430    }
431}
432
433/// Build a create response wrapping a ProgressEvent.
434fn create_response(request: &ResourceRequest) -> MockResponse {
435    wire::serialize_create_resource_response(&wire::CreateResourceOutput {
436        progress_event: Some(build_progress_event(request)),
437    })
438}
439
440/// Build a delete response wrapping a ProgressEvent.
441fn delete_response(request: &ResourceRequest) -> MockResponse {
442    wire::serialize_delete_resource_response(&wire::DeleteResourceOutput {
443        progress_event: Some(build_progress_event(request)),
444    })
445}
446
447/// Build an update response wrapping a ProgressEvent.
448fn update_response(request: &ResourceRequest) -> MockResponse {
449    wire::serialize_update_resource_response(&wire::UpdateResourceOutput {
450        progress_event: Some(build_progress_event(request)),
451    })
452}
453
454/// Build a cancel response wrapping a ProgressEvent.
455fn cancel_response(request: &ResourceRequest) -> MockResponse {
456    wire::serialize_cancel_resource_request_response(&wire::CancelResourceRequestOutput {
457        progress_event: Some(build_progress_event(request)),
458    })
459}