Skip to main content

winterbaume_cloudcontrol/
handlers.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4
5use serde_json::Value;
6use winterbaume_core::{
7    BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, default_account_id,
8    json_error_response,
9};
10
11use crate::state::{CloudControlError, CloudControlState};
12use crate::types::ResourceRequest;
13use crate::views::CloudControlStateView;
14use crate::wire;
15
16pub struct CloudControlService {
17    pub(crate) state: Arc<BackendState<CloudControlState>>,
18    pub(crate) notifier: StateChangeNotifier<CloudControlStateView>,
19}
20
21impl CloudControlService {
22    pub fn new() -> Self {
23        Self {
24            state: Arc::new(BackendState::new()),
25            notifier: StateChangeNotifier::new(),
26        }
27    }
28}
29
30impl Default for CloudControlService {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl MockService for CloudControlService {
37    fn service_name(&self) -> &str {
38        "cloudcontrolapi"
39    }
40
41    fn url_patterns(&self) -> Vec<&str> {
42        vec![
43            r"https?://cloudcontrolapi\..*\.amazonaws\.com",
44            r"https?://cloudcontrolapi\.amazonaws\.com",
45        ]
46    }
47
48    fn handle(
49        &self,
50        request: MockRequest,
51    ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
52        Box::pin(async move { self.dispatch(request).await })
53    }
54}
55
56/// Map domain errors to AWS-facing error responses.
57/// Exhaustive match -- no wildcard arm.
58fn service_error_response(err: &CloudControlError) -> MockResponse {
59    let (status, error_type) = match err {
60        CloudControlError::AlreadyExists { .. } => (400, "AlreadyExistsException"),
61        CloudControlError::ResourceNotFound { .. } => (404, "ResourceNotFoundException"),
62        CloudControlError::RequestTokenNotFound { .. } => (404, "RequestTokenNotFoundException"),
63        CloudControlError::TypeNotFound { .. } => (404, "TypeNotFoundException"),
64        CloudControlError::InvalidRequest { .. } => (400, "InvalidRequestException"),
65        CloudControlError::NotCancellable { .. } => (400, "ConcurrentModificationException"),
66    };
67    json_error_response(status, error_type, &err.to_string())
68}
69
70/// Mutating action names for state change notification.
71const MUTATING_ACTIONS: &[&str] = &[
72    "CreateResource",
73    "DeleteResource",
74    "UpdateResource",
75    "CancelResourceRequest",
76];
77
78impl CloudControlService {
79    async fn dispatch(&self, request: MockRequest) -> MockResponse {
80        let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
81        let account_id = default_account_id();
82
83        let action = request
84            .headers
85            .get("x-amz-target")
86            .and_then(|v| v.to_str().ok())
87            .and_then(|v| v.split('.').next_back())
88            .map(|s| s.to_string());
89
90        let action = match action {
91            Some(a) => a,
92            None => {
93                return json_error_response(400, "MissingAction", "Missing X-Amz-Target header");
94            }
95        };
96
97        if serde_json::from_slice::<Value>(&request.body).is_err() {
98            return json_error_response(400, "SerializationException", "Invalid JSON body");
99        }
100        let body_bytes: &[u8] = &request.body;
101
102        let state = self.state.get(account_id, &region);
103
104        let response = match action.as_str() {
105            "CreateResource" => self.handle_create_resource(&state, body_bytes).await,
106            "DeleteResource" => self.handle_delete_resource(&state, body_bytes).await,
107            "UpdateResource" => self.handle_update_resource(&state, body_bytes).await,
108            "GetResource" => self.handle_get_resource(&state, body_bytes).await,
109            "ListResources" => self.handle_list_resources(&state, body_bytes).await,
110            "GetResourceRequestStatus" => {
111                self.handle_get_resource_request_status(&state, body_bytes)
112                    .await
113            }
114            "ListResourceRequests" => self.handle_list_resource_requests(&state, body_bytes).await,
115            "CancelResourceRequest" => {
116                self.handle_cancel_resource_request(&state, body_bytes)
117                    .await
118            }
119            _ => json_error_response(
120                501,
121                "NotImplementedError",
122                &format!(
123                    "{} is not yet implemented in winterbaume-cloudcontrol",
124                    action
125                ),
126            ),
127        };
128
129        // Notify state change for mutating actions
130        if MUTATING_ACTIONS.contains(&action.as_str()) && response.status / 100 == 2 {
131            use winterbaume_core::StatefulService;
132            self.notify_state_changed(account_id, &region).await;
133        }
134
135        response
136    }
137
138    async fn handle_create_resource(
139        &self,
140        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
141        body: &[u8],
142    ) -> MockResponse {
143        let input = match wire::deserialize_create_resource_request(body) {
144            Ok(v) => v,
145            Err(e) => return json_error_response(400, "ValidationException", &e),
146        };
147        if input.type_name.is_empty() {
148            return json_error_response(
149                400,
150                "ValidationException",
151                "Missing required field 'TypeName'",
152            );
153        }
154        if input.desired_state.is_empty() {
155            return json_error_response(
156                400,
157                "ValidationException",
158                "Missing required field 'DesiredState'",
159            );
160        }
161        let type_name = input.type_name.as_str();
162        let desired_state = input.desired_state.as_str();
163
164        let mut guard = state.write().await;
165        match guard.create_resource(type_name, desired_state) {
166            Ok(request) => create_response(&request),
167            Err(e) => service_error_response(&e),
168        }
169    }
170
171    async fn handle_delete_resource(
172        &self,
173        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
174        body: &[u8],
175    ) -> MockResponse {
176        let input = match wire::deserialize_delete_resource_request(body) {
177            Ok(v) => v,
178            Err(e) => return json_error_response(400, "ValidationException", &e),
179        };
180        if input.type_name.is_empty() {
181            return json_error_response(
182                400,
183                "ValidationException",
184                "Missing required field 'TypeName'",
185            );
186        }
187        if input.identifier.is_empty() {
188            return json_error_response(
189                400,
190                "ValidationException",
191                "Missing required field 'Identifier'",
192            );
193        }
194        let type_name = input.type_name.as_str();
195        let identifier = input.identifier.as_str();
196
197        let mut guard = state.write().await;
198        match guard.delete_resource(type_name, identifier) {
199            Ok(request) => delete_response(&request),
200            Err(e) => service_error_response(&e),
201        }
202    }
203
204    async fn handle_update_resource(
205        &self,
206        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
207        body: &[u8],
208    ) -> MockResponse {
209        let input = match wire::deserialize_update_resource_request(body) {
210            Ok(v) => v,
211            Err(e) => return json_error_response(400, "ValidationException", &e),
212        };
213        if input.type_name.is_empty() {
214            return json_error_response(
215                400,
216                "ValidationException",
217                "Missing required field 'TypeName'",
218            );
219        }
220        if input.identifier.is_empty() {
221            return json_error_response(
222                400,
223                "ValidationException",
224                "Missing required field 'Identifier'",
225            );
226        }
227        if input.patch_document.is_empty() {
228            return json_error_response(
229                400,
230                "ValidationException",
231                "Missing required field 'PatchDocument'",
232            );
233        }
234        let type_name = input.type_name.as_str();
235        let identifier = input.identifier.as_str();
236        let patch_document = input.patch_document.as_str();
237
238        let mut guard = state.write().await;
239        match guard.update_resource(type_name, identifier, patch_document) {
240            Ok(request) => update_response(&request),
241            Err(e) => service_error_response(&e),
242        }
243    }
244
245    async fn handle_get_resource(
246        &self,
247        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
248        body: &[u8],
249    ) -> MockResponse {
250        let input = match wire::deserialize_get_resource_request(body) {
251            Ok(v) => v,
252            Err(e) => return json_error_response(400, "ValidationException", &e),
253        };
254        if input.type_name.is_empty() {
255            return json_error_response(
256                400,
257                "ValidationException",
258                "Missing required field 'TypeName'",
259            );
260        }
261        if input.identifier.is_empty() {
262            return json_error_response(
263                400,
264                "ValidationException",
265                "Missing required field 'Identifier'",
266            );
267        }
268        let type_name = input.type_name.as_str();
269        let identifier = input.identifier.as_str();
270
271        let guard = state.read().await;
272        match guard.get_resource(type_name, identifier) {
273            Ok(resource) => wire::serialize_get_resource_response(&wire::GetResourceOutput {
274                type_name: Some(resource.type_name.clone()),
275                resource_description: Some(wire::ResourceDescription {
276                    identifier: Some(resource.identifier.clone()),
277                    properties: Some(resource.resource_model.clone()),
278                }),
279            }),
280            Err(e) => service_error_response(&e),
281        }
282    }
283
284    async fn handle_list_resources(
285        &self,
286        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
287        body: &[u8],
288    ) -> MockResponse {
289        let input = match wire::deserialize_list_resources_request(body) {
290            Ok(v) => v,
291            Err(e) => return json_error_response(400, "ValidationException", &e),
292        };
293        if input.type_name.is_empty() {
294            return json_error_response(
295                400,
296                "ValidationException",
297                "Missing required field 'TypeName'",
298            );
299        }
300        let type_name = input.type_name.as_str();
301
302        let guard = state.read().await;
303        let resources = guard.list_resources(type_name);
304        let descriptions: Vec<wire::ResourceDescription> = resources
305            .iter()
306            .map(|r| wire::ResourceDescription {
307                identifier: Some(r.identifier.clone()),
308                properties: Some(r.resource_model.clone()),
309            })
310            .collect();
311
312        wire::serialize_list_resources_response(&wire::ListResourcesOutput {
313            type_name: Some(type_name.to_string()),
314            resource_descriptions: Some(descriptions),
315            next_token: None,
316        })
317    }
318
319    async fn handle_get_resource_request_status(
320        &self,
321        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
322        body: &[u8],
323    ) -> MockResponse {
324        let input = match wire::deserialize_get_resource_request_status_request(body) {
325            Ok(v) => v,
326            Err(e) => return json_error_response(400, "ValidationException", &e),
327        };
328        if input.request_token.is_empty() {
329            return json_error_response(
330                400,
331                "ValidationException",
332                "Missing required field 'RequestToken'",
333            );
334        }
335        let request_token = input.request_token.as_str();
336
337        let guard = state.read().await;
338        match guard.get_resource_request_status(request_token) {
339            Ok(request) => wire::serialize_get_resource_request_status_response(
340                &wire::GetResourceRequestStatusOutput {
341                    progress_event: Some(build_progress_event(request)),
342                    hooks_progress_event: None,
343                },
344            ),
345            Err(e) => service_error_response(&e),
346        }
347    }
348
349    async fn handle_list_resource_requests(
350        &self,
351        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
352        body: &[u8],
353    ) -> MockResponse {
354        let input = match wire::deserialize_list_resource_requests_request(body) {
355            Ok(v) => v,
356            Err(e) => return json_error_response(400, "ValidationException", &e),
357        };
358        let filter = input.resource_request_status_filter;
359        let operation_filter: Option<Vec<String>> =
360            filter.as_ref().and_then(|f| f.operations.clone());
361        let status_filter: Option<Vec<String>> =
362            filter.as_ref().and_then(|f| f.operation_statuses.clone());
363
364        let op_refs: Option<Vec<&str>> = operation_filter
365            .as_ref()
366            .map(|v| v.iter().map(|s| s.as_str()).collect());
367        let status_refs: Option<Vec<&str>> = status_filter
368            .as_ref()
369            .map(|v| v.iter().map(|s| s.as_str()).collect());
370
371        let guard = state.read().await;
372        let requests = guard.list_resource_requests(op_refs.as_deref(), status_refs.as_deref());
373        let summaries: Vec<wire::ProgressEvent> =
374            requests.iter().map(|r| build_progress_event(r)).collect();
375
376        wire::serialize_list_resource_requests_response(&wire::ListResourceRequestsOutput {
377            resource_request_status_summaries: Some(summaries),
378            next_token: None,
379        })
380    }
381
382    async fn handle_cancel_resource_request(
383        &self,
384        state: &Arc<tokio::sync::RwLock<CloudControlState>>,
385        body: &[u8],
386    ) -> MockResponse {
387        let input = match wire::deserialize_cancel_resource_request_request(body) {
388            Ok(v) => v,
389            Err(e) => return json_error_response(400, "ValidationException", &e),
390        };
391        if input.request_token.is_empty() {
392            return json_error_response(
393                400,
394                "ValidationException",
395                "Missing required field 'RequestToken'",
396            );
397        }
398        let request_token = input.request_token.as_str();
399
400        let mut guard = state.write().await;
401        match guard.cancel_resource_request(request_token) {
402            Ok(request) => cancel_response(&request),
403            Err(e) => service_error_response(&e),
404        }
405    }
406}
407
408/// Build a ProgressEvent wire type from a ResourceRequest.
409fn build_progress_event(request: &ResourceRequest) -> wire::ProgressEvent {
410    wire::ProgressEvent {
411        type_name: Some(request.type_name.clone()),
412        identifier: Some(request.identifier.clone()),
413        request_token: Some(request.request_token.clone()),
414        hooks_request_token: None,
415        operation: Some(request.operation.as_str().to_string()),
416        operation_status: Some(request.operation_status.as_str().to_string()),
417        event_time: Some(request.event_time.timestamp() as f64),
418        resource_model: request.resource_model.clone(),
419        status_message: request.status_message.clone(),
420        error_code: request.error_code.clone(),
421        retry_after: None,
422    }
423}
424
425/// Build a create response wrapping a ProgressEvent.
426fn create_response(request: &ResourceRequest) -> MockResponse {
427    wire::serialize_create_resource_response(&wire::CreateResourceOutput {
428        progress_event: Some(build_progress_event(request)),
429    })
430}
431
432/// Build a delete response wrapping a ProgressEvent.
433fn delete_response(request: &ResourceRequest) -> MockResponse {
434    wire::serialize_delete_resource_response(&wire::DeleteResourceOutput {
435        progress_event: Some(build_progress_event(request)),
436    })
437}
438
439/// Build an update response wrapping a ProgressEvent.
440fn update_response(request: &ResourceRequest) -> MockResponse {
441    wire::serialize_update_resource_response(&wire::UpdateResourceOutput {
442        progress_event: Some(build_progress_event(request)),
443    })
444}
445
446/// Build a cancel response wrapping a ProgressEvent.
447fn cancel_response(request: &ResourceRequest) -> MockResponse {
448    wire::serialize_cancel_resource_request_response(&wire::CancelResourceRequestOutput {
449        progress_event: Some(build_progress_event(request)),
450    })
451}