Skip to main content

winterbaume_appsync/
handlers.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use http::header::HeaderName;
7use serde_json::json;
8use winterbaume_core::{
9    BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, StatefulService,
10    default_account_id,
11};
12
13use crate::state::{AppSyncError, AppSyncState};
14use crate::views::AppsyncStateView;
15use crate::wire;
16
17const X_AMZN_ERRORTYPE: HeaderName = HeaderName::from_static("x-amzn-errortype");
18
19pub struct AppSyncService {
20    pub(crate) state: Arc<BackendState<AppSyncState>>,
21    pub(crate) notifier: StateChangeNotifier<AppsyncStateView>,
22}
23
24impl AppSyncService {
25    pub fn new() -> Self {
26        Self {
27            state: Arc::new(BackendState::new()),
28            notifier: StateChangeNotifier::new(),
29        }
30    }
31}
32
33impl Default for AppSyncService {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl MockService for AppSyncService {
40    fn service_name(&self) -> &str {
41        "appsync"
42    }
43
44    fn url_patterns(&self) -> Vec<&str> {
45        vec![r"https?://appsync\.(.+)\.amazonaws\.com"]
46    }
47
48    fn handle(
49        &self,
50        request: MockRequest,
51    ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
52        Box::pin(async move { self.dispatch(request).await })
53    }
54}
55
56impl AppSyncService {
57    async fn dispatch(&self, request: MockRequest) -> MockResponse {
58        let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
59        let account_id = default_account_id();
60        let state = self.state.get(account_id, &region);
61
62        let (path, query_string) = extract_path_and_query(&request.uri);
63        let method = request.method.as_str();
64
65        let segments: Vec<&str> = path.trim_start_matches('/').split('/').collect();
66        let query_map: HashMap<String, String> =
67            winterbaume_core::parse_query_string(&query_string);
68
69        let response = if segments.is_empty() {
70            rest_json_error(404, "UnknownOperationException", "Not found")
71        } else if segments.len() >= 3 && segments[0] == "v1" && segments[1] == "tags" {
72            // ============ v1/tags/{resourceArn+} ============
73            let resource_arn = percent_decode(&segments[2..].join("/"));
74            let labels: &[(&str, &str)] = &[("resourceArn", resource_arn.as_str())];
75            match method {
76                "GET" => {
77                    self.handle_list_tags_for_resource(&state, &request, labels, &query_map)
78                        .await
79                }
80                "POST" => {
81                    self.handle_tag_resource(&state, &request, labels, &query_map)
82                        .await
83                }
84                "DELETE" => {
85                    self.handle_untag_resource(&state, &request, labels, &query_map, &query_string)
86                        .await
87                }
88                _ => rest_json_error(404, "UnknownOperationException", "Not found"),
89            }
90        } else if segments.len() >= 2 && segments[0] == "v2" && segments[1] == "apis" {
91            // ============ v2 routes ============
92            self.dispatch_v2(
93                method, &segments, &request, &query_map, account_id, &region, &state,
94            )
95            .await
96        } else if segments.len() >= 2 && segments[0] == "v1" && segments[1] == "apis" {
97            // ============ v1 routes ============
98            self.dispatch_v1(
99                method, &segments, &request, &query_map, account_id, &region, &state,
100            )
101            .await
102        } else {
103            rest_json_error(404, "UnknownOperationException", "Not found")
104        };
105
106        if matches!(method, "PUT" | "POST" | "DELETE") && response.status / 100 == 2 {
107            self.notify_state_changed(account_id, &region).await;
108        }
109        response
110    }
111
112    #[allow(clippy::too_many_arguments)]
113    async fn dispatch_v2(
114        &self,
115        method: &str,
116        segments: &[&str],
117        request: &MockRequest,
118        query_map: &HashMap<String, String>,
119        account_id: &str,
120        region: &str,
121        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
122    ) -> MockResponse {
123        // v2, apis, ...
124        match (method, segments.len()) {
125            // POST /v2/apis - CreateApi
126            ("POST", 2) => {
127                self.handle_create_api(state, request, &[], query_map, account_id, region)
128                    .await
129            }
130            // GET /v2/apis - ListApis
131            ("GET", 2) => self.handle_list_apis(state, request, &[], query_map).await,
132            // GET /v2/apis/{apiId} - GetApi
133            ("GET", 3) => {
134                let labels: &[(&str, &str)] = &[("apiId", segments[2])];
135                self.handle_get_api(state, request, labels, query_map).await
136            }
137            // DELETE /v2/apis/{apiId} - DeleteApi
138            ("DELETE", 3) => {
139                let labels: &[(&str, &str)] = &[("apiId", segments[2])];
140                self.handle_delete_api(state, request, labels, query_map)
141                    .await
142            }
143            // POST /v2/apis/{apiId}/channelNamespaces - CreateChannelNamespace
144            ("POST", 4) if segments[3] == "channelNamespaces" => {
145                let labels: &[(&str, &str)] = &[("apiId", segments[2])];
146                self.handle_create_channel_namespace(
147                    state, request, labels, query_map, account_id, region,
148                )
149                .await
150            }
151            // GET /v2/apis/{apiId}/channelNamespaces - ListChannelNamespaces
152            ("GET", 4) if segments[3] == "channelNamespaces" => {
153                let labels: &[(&str, &str)] = &[("apiId", segments[2])];
154                self.handle_list_channel_namespaces(state, request, labels, query_map)
155                    .await
156            }
157            // DELETE /v2/apis/{apiId}/channelNamespaces/{name} - DeleteChannelNamespace
158            ("DELETE", 5) if segments[3] == "channelNamespaces" => {
159                let labels: &[(&str, &str)] = &[("apiId", segments[2]), ("name", segments[4])];
160                self.handle_delete_channel_namespace(state, request, labels, query_map)
161                    .await
162            }
163            _ => rest_json_error(404, "UnknownOperationException", "Not found"),
164        }
165    }
166
167    #[allow(clippy::too_many_arguments)]
168    async fn dispatch_v1(
169        &self,
170        method: &str,
171        segments: &[&str],
172        request: &MockRequest,
173        query_map: &HashMap<String, String>,
174        account_id: &str,
175        region: &str,
176        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
177    ) -> MockResponse {
178        // segments: ["v1", "apis", ...]
179        match (method, segments.len()) {
180            // POST /v1/apis - CreateGraphqlApi
181            ("POST", 2) => {
182                self.handle_create_graphql_api(state, request, &[], query_map, account_id, region)
183                    .await
184            }
185            // GET /v1/apis - ListGraphqlApis
186            ("GET", 2) => {
187                self.handle_list_graphql_apis(state, request, &[], query_map)
188                    .await
189            }
190            // GET /v1/apis/{apiId} - GetGraphqlApi
191            ("GET", 3) => {
192                let labels: &[(&str, &str)] = &[("apiId", segments[2])];
193                self.handle_get_graphql_api(state, request, labels, query_map)
194                    .await
195            }
196            // POST /v1/apis/{apiId} - UpdateGraphqlApi
197            ("POST", 3) => {
198                let labels: &[(&str, &str)] = &[("apiId", segments[2])];
199                self.handle_update_graphql_api(state, request, labels, query_map)
200                    .await
201            }
202            // DELETE /v1/apis/{apiId} - DeleteGraphqlApi
203            ("DELETE", 3) => {
204                let labels: &[(&str, &str)] = &[("apiId", segments[2])];
205                self.handle_delete_graphql_api(state, request, labels, query_map)
206                    .await
207            }
208            // Routes with 4 segments: /v1/apis/{apiId}/{sub}
209            (_, 4) => {
210                let api_id = segments[2];
211                let sub = segments[3];
212                let labels_api: &[(&str, &str)] = &[("apiId", api_id)];
213                match (method, sub) {
214                    // POST /v1/apis/{apiId}/ApiCaches - CreateApiCache
215                    ("POST", "ApiCaches") => {
216                        self.handle_create_api_cache(state, request, labels_api, query_map)
217                            .await
218                    }
219                    // GET /v1/apis/{apiId}/ApiCaches - GetApiCache
220                    ("GET", "ApiCaches") => {
221                        self.handle_get_api_cache(state, request, labels_api, query_map)
222                            .await
223                    }
224                    // DELETE /v1/apis/{apiId}/ApiCaches - DeleteApiCache
225                    ("DELETE", "ApiCaches") => {
226                        self.handle_delete_api_cache(state, request, labels_api, query_map)
227                            .await
228                    }
229                    // POST /v1/apis/{apiId}/apikeys - CreateApiKey
230                    ("POST", "apikeys") => {
231                        self.handle_create_api_key(state, request, labels_api, query_map)
232                            .await
233                    }
234                    // GET /v1/apis/{apiId}/apikeys - ListApiKeys
235                    ("GET", "apikeys") => {
236                        self.handle_list_api_keys(state, request, labels_api, query_map)
237                            .await
238                    }
239                    // DELETE /v1/apis/{apiId}/FlushCache - FlushApiCache
240                    ("DELETE", "FlushCache") => {
241                        self.handle_flush_api_cache(state, request, labels_api, query_map)
242                            .await
243                    }
244                    // POST /v1/apis/{apiId}/schemacreation - StartSchemaCreation
245                    ("POST", "schemacreation") => {
246                        self.handle_start_schema_creation(state, request, labels_api, query_map)
247                            .await
248                    }
249                    // GET /v1/apis/{apiId}/schemacreation - GetSchemaCreationStatus
250                    ("GET", "schemacreation") => {
251                        self.handle_get_schema_creation_status(
252                            state, request, labels_api, query_map,
253                        )
254                        .await
255                    }
256                    _ => rest_json_error(404, "UnknownOperationException", "Not found"),
257                }
258            }
259            // Routes with 5 segments: /v1/apis/{apiId}/{sub}/{subId}
260            (_, 5) => {
261                let api_id = segments[2];
262                let sub = segments[3];
263                let sub_id = segments[4];
264                match (method, sub) {
265                    // POST /v1/apis/{apiId}/ApiCaches/update - UpdateApiCache
266                    ("POST", "ApiCaches") if sub_id == "update" => {
267                        let labels: &[(&str, &str)] = &[("apiId", api_id)];
268                        self.handle_update_api_cache(state, request, labels, query_map)
269                            .await
270                    }
271                    // DELETE /v1/apis/{apiId}/apikeys/{id} - DeleteApiKey
272                    ("DELETE", "apikeys") => {
273                        let labels: &[(&str, &str)] = &[("apiId", api_id), ("id", sub_id)];
274                        self.handle_delete_api_key(state, request, labels, query_map)
275                            .await
276                    }
277                    // POST /v1/apis/{apiId}/apikeys/{id} - UpdateApiKey
278                    ("POST", "apikeys") => {
279                        let labels: &[(&str, &str)] = &[("apiId", api_id), ("id", sub_id)];
280                        self.handle_update_api_key(state, request, labels, query_map)
281                            .await
282                    }
283                    // GET /v1/apis/{apiId}/types/{typeName} - GetType
284                    ("GET", "types") => {
285                        let labels: &[(&str, &str)] = &[("apiId", api_id), ("typeName", sub_id)];
286                        self.handle_get_type(state, request, labels, query_map)
287                            .await
288                    }
289                    _ => rest_json_error(404, "UnknownOperationException", "Not found"),
290                }
291            }
292            _ => rest_json_error(404, "UnknownOperationException", "Not found"),
293        }
294    }
295
296    // ===================== GraphQL API (v1) handlers =====================
297
298    async fn handle_create_graphql_api(
299        &self,
300        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
301        request: &MockRequest,
302        labels: &[(&str, &str)],
303        query: &HashMap<String, String>,
304        account_id: &str,
305        region: &str,
306    ) -> MockResponse {
307        let input = match wire::deserialize_create_graphql_api_request(request, labels, query) {
308            Ok(v) => v,
309            Err(e) => return rest_json_error(400, "ValidationException", &e),
310        };
311        if input.name.is_empty() {
312            return rest_json_error(400, "BadRequestException", "Missing 'name'");
313        }
314        if input.authentication_type.is_empty() {
315            return rest_json_error(400, "BadRequestException", "Missing 'authenticationType'");
316        }
317        let tags = input.tags.unwrap_or_default();
318
319        let mut s = state.write().await;
320        match s.create_graphql_api(
321            &input.name,
322            &input.authentication_type,
323            account_id,
324            region,
325            tags,
326        ) {
327            Ok(api) => {
328                let resp = wire::CreateGraphqlApiResponse {
329                    graphql_api: Some(graphql_api_to_wire(api)),
330                };
331                wire::serialize_create_graphql_api_response(&resp)
332            }
333            Err(e) => appsync_error_response(&e),
334        }
335    }
336
337    async fn handle_get_graphql_api(
338        &self,
339        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
340        request: &MockRequest,
341        labels: &[(&str, &str)],
342        query: &HashMap<String, String>,
343    ) -> MockResponse {
344        let input = match wire::deserialize_get_graphql_api_request(request, labels, query) {
345            Ok(v) => v,
346            Err(e) => return rest_json_error(400, "ValidationException", &e),
347        };
348        let s = state.read().await;
349        match s.get_graphql_api(&input.api_id) {
350            Ok(api) => {
351                let resp = wire::GetGraphqlApiResponse {
352                    graphql_api: Some(graphql_api_to_wire(api)),
353                };
354                wire::serialize_get_graphql_api_response(&resp)
355            }
356            Err(e) => appsync_error_response(&e),
357        }
358    }
359
360    async fn handle_delete_graphql_api(
361        &self,
362        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
363        request: &MockRequest,
364        labels: &[(&str, &str)],
365        query: &HashMap<String, String>,
366    ) -> MockResponse {
367        let input = match wire::deserialize_delete_graphql_api_request(request, labels, query) {
368            Ok(v) => v,
369            Err(e) => return rest_json_error(400, "ValidationException", &e),
370        };
371        let mut s = state.write().await;
372        match s.delete_graphql_api(&input.api_id) {
373            Ok(()) => {
374                let resp = wire::DeleteGraphqlApiResponse {};
375                wire::serialize_delete_graphql_api_response(&resp)
376            }
377            Err(e) => appsync_error_response(&e),
378        }
379    }
380
381    async fn handle_list_graphql_apis(
382        &self,
383        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
384        request: &MockRequest,
385        labels: &[(&str, &str)],
386        query: &HashMap<String, String>,
387    ) -> MockResponse {
388        let _input = match wire::deserialize_list_graphql_apis_request(request, labels, query) {
389            Ok(v) => v,
390            Err(e) => return rest_json_error(400, "ValidationException", &e),
391        };
392        let s = state.read().await;
393        let apis = s.list_graphql_apis();
394        let entries: Vec<wire::GraphqlApi> =
395            apis.iter().map(|api| graphql_api_to_wire(api)).collect();
396        let resp = wire::ListGraphqlApisResponse {
397            graphql_apis: Some(entries),
398            next_token: None,
399        };
400        wire::serialize_list_graphql_apis_response(&resp)
401    }
402
403    async fn handle_update_graphql_api(
404        &self,
405        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
406        request: &MockRequest,
407        labels: &[(&str, &str)],
408        query: &HashMap<String, String>,
409    ) -> MockResponse {
410        let input = match wire::deserialize_update_graphql_api_request(request, labels, query) {
411            Ok(v) => v,
412            Err(e) => return rest_json_error(400, "ValidationException", &e),
413        };
414        let name = if input.name.is_empty() {
415            None
416        } else {
417            Some(input.name.as_str())
418        };
419        let authentication_type = if input.authentication_type.is_empty() {
420            None
421        } else {
422            Some(input.authentication_type.as_str())
423        };
424
425        let mut s = state.write().await;
426        match s.update_graphql_api(&input.api_id, name, authentication_type) {
427            Ok(api) => {
428                let resp = wire::UpdateGraphqlApiResponse {
429                    graphql_api: Some(graphql_api_to_wire(api)),
430                };
431                wire::serialize_update_graphql_api_response(&resp)
432            }
433            Err(e) => appsync_error_response(&e),
434        }
435    }
436
437    // ===================== Event API (v2) handlers =====================
438
439    async fn handle_create_api(
440        &self,
441        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
442        request: &MockRequest,
443        labels: &[(&str, &str)],
444        query: &HashMap<String, String>,
445        account_id: &str,
446        region: &str,
447    ) -> MockResponse {
448        let input = match wire::deserialize_create_api_request(request, labels, query) {
449            Ok(v) => v,
450            Err(e) => return rest_json_error(400, "ValidationException", &e),
451        };
452        if input.name.is_empty() {
453            return rest_json_error(400, "BadRequestException", "Missing 'name'");
454        }
455        let owner_contact = input.owner_contact.as_deref();
456        let tags = input.tags.unwrap_or_default();
457
458        let mut s = state.write().await;
459        match s.create_api(&input.name, account_id, region, owner_contact, tags) {
460            Ok(api) => {
461                let resp = wire::CreateApiResponse {
462                    api: Some(api_to_wire(api)),
463                };
464                wire::serialize_create_api_response(&resp)
465            }
466            Err(e) => appsync_error_response(&e),
467        }
468    }
469
470    async fn handle_get_api(
471        &self,
472        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
473        request: &MockRequest,
474        labels: &[(&str, &str)],
475        query: &HashMap<String, String>,
476    ) -> MockResponse {
477        let input = match wire::deserialize_get_api_request(request, labels, query) {
478            Ok(v) => v,
479            Err(e) => return rest_json_error(400, "ValidationException", &e),
480        };
481        let s = state.read().await;
482        match s.get_api(&input.api_id) {
483            Ok(api) => {
484                let resp = wire::GetApiResponse {
485                    api: Some(api_to_wire(api)),
486                };
487                wire::serialize_get_api_response(&resp)
488            }
489            Err(e) => appsync_error_response(&e),
490        }
491    }
492
493    async fn handle_delete_api(
494        &self,
495        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
496        request: &MockRequest,
497        labels: &[(&str, &str)],
498        query: &HashMap<String, String>,
499    ) -> MockResponse {
500        let input = match wire::deserialize_delete_api_request(request, labels, query) {
501            Ok(v) => v,
502            Err(e) => return rest_json_error(400, "ValidationException", &e),
503        };
504        let mut s = state.write().await;
505        match s.delete_api(&input.api_id) {
506            Ok(()) => {
507                let resp = wire::DeleteApiResponse {};
508                wire::serialize_delete_api_response(&resp)
509            }
510            Err(e) => appsync_error_response(&e),
511        }
512    }
513
514    async fn handle_list_apis(
515        &self,
516        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
517        request: &MockRequest,
518        labels: &[(&str, &str)],
519        query: &HashMap<String, String>,
520    ) -> MockResponse {
521        let _input = match wire::deserialize_list_apis_request(request, labels, query) {
522            Ok(v) => v,
523            Err(e) => return rest_json_error(400, "ValidationException", &e),
524        };
525        let s = state.read().await;
526        let apis = s.list_apis();
527        let entries: Vec<wire::Api> = apis.iter().map(|api| api_to_wire(api)).collect();
528        let resp = wire::ListApisResponse {
529            apis: Some(entries),
530            next_token: None,
531        };
532        wire::serialize_list_apis_response(&resp)
533    }
534
535    // ===================== API Cache handlers =====================
536
537    async fn handle_create_api_cache(
538        &self,
539        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
540        request: &MockRequest,
541        labels: &[(&str, &str)],
542        query: &HashMap<String, String>,
543    ) -> MockResponse {
544        let input = match wire::deserialize_create_api_cache_request(request, labels, query) {
545            Ok(v) => v,
546            Err(e) => return rest_json_error(400, "ValidationException", &e),
547        };
548        if input.api_caching_behavior.is_empty() {
549            return rest_json_error(400, "BadRequestException", "Missing 'apiCachingBehavior'");
550        }
551        if input.r#type.is_empty() {
552            return rest_json_error(400, "BadRequestException", "Missing 'type'");
553        }
554        let ttl = if input.ttl == 0 { 3600 } else { input.ttl };
555        let at_rest = input.at_rest_encryption_enabled.unwrap_or(false);
556        let transit = input.transit_encryption_enabled.unwrap_or(false);
557        let health_metrics = input.health_metrics_config.as_deref();
558
559        let mut s = state.write().await;
560        match s.create_api_cache(
561            &input.api_id,
562            &input.api_caching_behavior,
563            &input.r#type,
564            ttl,
565            at_rest,
566            transit,
567            health_metrics,
568        ) {
569            Ok(cache) => {
570                let resp = wire::CreateApiCacheResponse {
571                    api_cache: Some(api_cache_to_wire(cache)),
572                };
573                wire::serialize_create_api_cache_response(&resp)
574            }
575            Err(e) => appsync_error_response(&e),
576        }
577    }
578
579    async fn handle_get_api_cache(
580        &self,
581        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
582        request: &MockRequest,
583        labels: &[(&str, &str)],
584        query: &HashMap<String, String>,
585    ) -> MockResponse {
586        let input = match wire::deserialize_get_api_cache_request(request, labels, query) {
587            Ok(v) => v,
588            Err(e) => return rest_json_error(400, "ValidationException", &e),
589        };
590        let s = state.read().await;
591        match s.get_api_cache(&input.api_id) {
592            Ok(cache) => {
593                let resp = wire::GetApiCacheResponse {
594                    api_cache: Some(api_cache_to_wire(cache)),
595                };
596                wire::serialize_get_api_cache_response(&resp)
597            }
598            Err(e) => appsync_error_response(&e),
599        }
600    }
601
602    async fn handle_delete_api_cache(
603        &self,
604        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
605        request: &MockRequest,
606        labels: &[(&str, &str)],
607        query: &HashMap<String, String>,
608    ) -> MockResponse {
609        let input = match wire::deserialize_delete_api_cache_request(request, labels, query) {
610            Ok(v) => v,
611            Err(e) => return rest_json_error(400, "ValidationException", &e),
612        };
613        let mut s = state.write().await;
614        match s.delete_api_cache(&input.api_id) {
615            Ok(()) => {
616                let resp = wire::DeleteApiCacheResponse {};
617                wire::serialize_delete_api_cache_response(&resp)
618            }
619            Err(e) => appsync_error_response(&e),
620        }
621    }
622
623    async fn handle_update_api_cache(
624        &self,
625        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
626        request: &MockRequest,
627        labels: &[(&str, &str)],
628        query: &HashMap<String, String>,
629    ) -> MockResponse {
630        let input = match wire::deserialize_update_api_cache_request(request, labels, query) {
631            Ok(v) => v,
632            Err(e) => return rest_json_error(400, "ValidationException", &e),
633        };
634        if input.api_caching_behavior.is_empty() {
635            return rest_json_error(400, "BadRequestException", "Missing 'apiCachingBehavior'");
636        }
637        if input.r#type.is_empty() {
638            return rest_json_error(400, "BadRequestException", "Missing 'type'");
639        }
640        let ttl = if input.ttl == 0 { 3600 } else { input.ttl };
641        let health_metrics = input.health_metrics_config.as_deref();
642
643        let mut s = state.write().await;
644        match s.update_api_cache(
645            &input.api_id,
646            &input.api_caching_behavior,
647            &input.r#type,
648            ttl,
649            health_metrics,
650        ) {
651            Ok(cache) => {
652                let resp = wire::UpdateApiCacheResponse {
653                    api_cache: Some(api_cache_to_wire(cache)),
654                };
655                wire::serialize_update_api_cache_response(&resp)
656            }
657            Err(e) => appsync_error_response(&e),
658        }
659    }
660
661    async fn handle_flush_api_cache(
662        &self,
663        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
664        request: &MockRequest,
665        labels: &[(&str, &str)],
666        query: &HashMap<String, String>,
667    ) -> MockResponse {
668        let input = match wire::deserialize_flush_api_cache_request(request, labels, query) {
669            Ok(v) => v,
670            Err(e) => return rest_json_error(400, "ValidationException", &e),
671        };
672        let s = state.read().await;
673        match s.flush_api_cache(&input.api_id) {
674            Ok(()) => {
675                let resp = wire::FlushApiCacheResponse {};
676                wire::serialize_flush_api_cache_response(&resp)
677            }
678            Err(e) => appsync_error_response(&e),
679        }
680    }
681
682    // ===================== API Key handlers =====================
683
684    async fn handle_create_api_key(
685        &self,
686        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
687        request: &MockRequest,
688        labels: &[(&str, &str)],
689        query: &HashMap<String, String>,
690    ) -> MockResponse {
691        let input = match wire::deserialize_create_api_key_request(request, labels, query) {
692            Ok(v) => v,
693            Err(e) => return rest_json_error(400, "ValidationException", &e),
694        };
695        let description = input.description.as_deref();
696        let expires = input.expires.unwrap_or_else(|| {
697            let now = std::time::SystemTime::now()
698                .duration_since(std::time::UNIX_EPOCH)
699                .unwrap_or_default()
700                .as_secs() as i64;
701            now + 7 * 24 * 60 * 60 // default: 7 days from now
702        });
703
704        let mut s = state.write().await;
705        match s.create_api_key(&input.api_id, description, expires) {
706            Ok(key) => {
707                let resp = wire::CreateApiKeyResponse {
708                    api_key: Some(api_key_to_wire(key)),
709                };
710                wire::serialize_create_api_key_response(&resp)
711            }
712            Err(e) => appsync_error_response(&e),
713        }
714    }
715
716    async fn handle_delete_api_key(
717        &self,
718        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
719        request: &MockRequest,
720        labels: &[(&str, &str)],
721        query: &HashMap<String, String>,
722    ) -> MockResponse {
723        let input = match wire::deserialize_delete_api_key_request(request, labels, query) {
724            Ok(v) => v,
725            Err(e) => return rest_json_error(400, "ValidationException", &e),
726        };
727        let mut s = state.write().await;
728        match s.delete_api_key(&input.api_id, &input.id) {
729            Ok(()) => {
730                let resp = wire::DeleteApiKeyResponse {};
731                wire::serialize_delete_api_key_response(&resp)
732            }
733            Err(e) => appsync_error_response(&e),
734        }
735    }
736
737    async fn handle_list_api_keys(
738        &self,
739        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
740        request: &MockRequest,
741        labels: &[(&str, &str)],
742        query: &HashMap<String, String>,
743    ) -> MockResponse {
744        let input = match wire::deserialize_list_api_keys_request(request, labels, query) {
745            Ok(v) => v,
746            Err(e) => return rest_json_error(400, "ValidationException", &e),
747        };
748        let s = state.read().await;
749        match s.list_api_keys(&input.api_id) {
750            Ok(keys) => {
751                let entries: Vec<wire::ApiKey> = keys.iter().map(|k| api_key_to_wire(k)).collect();
752                let resp = wire::ListApiKeysResponse {
753                    api_keys: Some(entries),
754                    next_token: None,
755                };
756                wire::serialize_list_api_keys_response(&resp)
757            }
758            Err(e) => appsync_error_response(&e),
759        }
760    }
761
762    async fn handle_update_api_key(
763        &self,
764        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
765        request: &MockRequest,
766        labels: &[(&str, &str)],
767        query: &HashMap<String, String>,
768    ) -> MockResponse {
769        let input = match wire::deserialize_update_api_key_request(request, labels, query) {
770            Ok(v) => v,
771            Err(e) => return rest_json_error(400, "ValidationException", &e),
772        };
773        let description = input.description.as_deref();
774        let expires = input.expires;
775
776        let mut s = state.write().await;
777        match s.update_api_key(&input.api_id, &input.id, description, expires) {
778            Ok(key) => {
779                let resp = wire::UpdateApiKeyResponse {
780                    api_key: Some(api_key_to_wire(key)),
781                };
782                wire::serialize_update_api_key_response(&resp)
783            }
784            Err(e) => appsync_error_response(&e),
785        }
786    }
787
788    // ===================== Channel Namespace handlers (v2) =====================
789
790    async fn handle_create_channel_namespace(
791        &self,
792        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
793        request: &MockRequest,
794        labels: &[(&str, &str)],
795        query: &HashMap<String, String>,
796        account_id: &str,
797        region: &str,
798    ) -> MockResponse {
799        let input = match wire::deserialize_create_channel_namespace_request(request, labels, query)
800        {
801            Ok(v) => v,
802            Err(e) => return rest_json_error(400, "ValidationException", &e),
803        };
804        if input.name.is_empty() {
805            return rest_json_error(400, "BadRequestException", "Missing 'name'");
806        }
807        let tags = input.tags.unwrap_or_default();
808
809        let mut s = state.write().await;
810        match s.create_channel_namespace(&input.api_id, &input.name, account_id, region, tags) {
811            Ok(ns) => {
812                let resp = wire::CreateChannelNamespaceResponse {
813                    channel_namespace: Some(channel_namespace_to_wire(ns)),
814                };
815                wire::serialize_create_channel_namespace_response(&resp)
816            }
817            Err(e) => appsync_error_response(&e),
818        }
819    }
820
821    async fn handle_delete_channel_namespace(
822        &self,
823        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
824        request: &MockRequest,
825        labels: &[(&str, &str)],
826        query: &HashMap<String, String>,
827    ) -> MockResponse {
828        let input = match wire::deserialize_delete_channel_namespace_request(request, labels, query)
829        {
830            Ok(v) => v,
831            Err(e) => return rest_json_error(400, "ValidationException", &e),
832        };
833        let mut s = state.write().await;
834        match s.delete_channel_namespace(&input.api_id, &input.name) {
835            Ok(()) => {
836                let resp = wire::DeleteChannelNamespaceResponse {};
837                wire::serialize_delete_channel_namespace_response(&resp)
838            }
839            Err(e) => appsync_error_response(&e),
840        }
841    }
842
843    async fn handle_list_channel_namespaces(
844        &self,
845        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
846        request: &MockRequest,
847        labels: &[(&str, &str)],
848        query: &HashMap<String, String>,
849    ) -> MockResponse {
850        let input = match wire::deserialize_list_channel_namespaces_request(request, labels, query)
851        {
852            Ok(v) => v,
853            Err(e) => return rest_json_error(400, "ValidationException", &e),
854        };
855        let s = state.read().await;
856        match s.list_channel_namespaces(&input.api_id) {
857            Ok(nss) => {
858                let entries: Vec<wire::ChannelNamespace> =
859                    nss.iter().map(|ns| channel_namespace_to_wire(ns)).collect();
860                let resp = wire::ListChannelNamespacesResponse {
861                    channel_namespaces: Some(entries),
862                    next_token: None,
863                };
864                wire::serialize_list_channel_namespaces_response(&resp)
865            }
866            Err(e) => appsync_error_response(&e),
867        }
868    }
869
870    // ===================== Schema handlers =====================
871
872    async fn handle_start_schema_creation(
873        &self,
874        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
875        request: &MockRequest,
876        labels: &[(&str, &str)],
877        query: &HashMap<String, String>,
878    ) -> MockResponse {
879        let input = match wire::deserialize_start_schema_creation_request(request, labels, query) {
880            Ok(v) => v,
881            Err(e) => return rest_json_error(400, "ValidationException", &e),
882        };
883
884        let mut s = state.write().await;
885        match s.start_schema_creation(&input.api_id, input.definition.as_bytes()) {
886            Ok(status) => {
887                let resp = wire::StartSchemaCreationResponse {
888                    status: Some(status.status.clone()),
889                };
890                wire::serialize_start_schema_creation_response(&resp)
891            }
892            Err(e) => appsync_error_response(&e),
893        }
894    }
895
896    async fn handle_get_schema_creation_status(
897        &self,
898        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
899        request: &MockRequest,
900        labels: &[(&str, &str)],
901        query: &HashMap<String, String>,
902    ) -> MockResponse {
903        let input =
904            match wire::deserialize_get_schema_creation_status_request(request, labels, query) {
905                Ok(v) => v,
906                Err(e) => return rest_json_error(400, "ValidationException", &e),
907            };
908        let s = state.read().await;
909        match s.get_schema_creation_status(&input.api_id) {
910            Ok(status) => {
911                let resp = wire::GetSchemaCreationStatusResponse {
912                    status: Some(status.status.clone()),
913                    details: status.details.clone(),
914                };
915                wire::serialize_get_schema_creation_status_response(&resp)
916            }
917            Err(e) => appsync_error_response(&e),
918        }
919    }
920
921    // ===================== Type handlers =====================
922
923    async fn handle_get_type(
924        &self,
925        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
926        request: &MockRequest,
927        labels: &[(&str, &str)],
928        query: &HashMap<String, String>,
929    ) -> MockResponse {
930        let input = match wire::deserialize_get_type_request(request, labels, query) {
931            Ok(v) => v,
932            Err(e) => return rest_json_error(400, "ValidationException", &e),
933        };
934        let format = if input.format.is_empty() {
935            "SDL"
936        } else {
937            input.format.as_str()
938        };
939        let s = state.read().await;
940        match s.get_type(&input.api_id, &input.type_name, format) {
941            Ok(t) => {
942                let resp = wire::GetTypeResponse {
943                    r#type: Some(type_to_wire(t)),
944                };
945                wire::serialize_get_type_response(&resp)
946            }
947            Err(e) => appsync_error_response(&e),
948        }
949    }
950
951    // ===================== Tag handlers =====================
952
953    async fn handle_list_tags_for_resource(
954        &self,
955        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
956        request: &MockRequest,
957        labels: &[(&str, &str)],
958        query: &HashMap<String, String>,
959    ) -> MockResponse {
960        let input = match wire::deserialize_list_tags_for_resource_request(request, labels, query) {
961            Ok(v) => v,
962            Err(e) => return rest_json_error(400, "ValidationException", &e),
963        };
964        let s = state.read().await;
965        let tags = s.list_tags_for_resource(&input.resource_arn);
966        wire::serialize_list_tags_for_resource_response(&wire::ListTagsForResourceResponse {
967            tags: Some(tags),
968        })
969    }
970
971    async fn handle_tag_resource(
972        &self,
973        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
974        request: &MockRequest,
975        labels: &[(&str, &str)],
976        query: &HashMap<String, String>,
977    ) -> MockResponse {
978        let input = match wire::deserialize_tag_resource_request(request, labels, query) {
979            Ok(v) => v,
980            Err(e) => return rest_json_error(400, "ValidationException", &e),
981        };
982        let mut s = state.write().await;
983        s.tag_resource(&input.resource_arn, input.tags);
984        wire::serialize_tag_resource_response(&wire::TagResourceResponse {})
985    }
986
987    async fn handle_untag_resource(
988        &self,
989        state: &Arc<tokio::sync::RwLock<AppSyncState>>,
990        request: &MockRequest,
991        labels: &[(&str, &str)],
992        query: &HashMap<String, String>,
993        raw_query_string: &str,
994    ) -> MockResponse {
995        let input = match wire::deserialize_untag_resource_request(request, labels, query) {
996            Ok(v) => v,
997            Err(e) => return rest_json_error(400, "ValidationException", &e),
998        };
999        // The AWS SDK encodes tagKeys as repeated query params
1000        // (`?tagKeys=key1&tagKeys=key2`), but `parse_query_string` only retains the
1001        // last value, and the wire deserializer comma-splits a single value. Re-extract
1002        // from the raw query string to preserve multi-key untag semantics.
1003        let tag_keys = extract_query_list(raw_query_string, "tagKeys");
1004        let tag_keys = if tag_keys.is_empty() {
1005            input.tag_keys
1006        } else {
1007            tag_keys
1008        };
1009        let mut s = state.write().await;
1010        s.untag_resource(&input.resource_arn, &tag_keys);
1011        wire::serialize_untag_resource_response(&wire::UntagResourceResponse {})
1012    }
1013}
1014
1015// ===================== Wire type converters =====================
1016
1017fn graphql_api_to_wire(api: &crate::types::GraphqlApi) -> wire::GraphqlApi {
1018    wire::GraphqlApi {
1019        api_id: Some(api.api_id.clone()),
1020        name: Some(api.name.clone()),
1021        authentication_type: Some(api.authentication_type.clone()),
1022        arn: Some(api.arn.clone()),
1023        uris: Some(api.uris.clone()),
1024        tags: if api.tags.is_empty() {
1025            None
1026        } else {
1027            Some(api.tags.clone())
1028        },
1029        ..Default::default()
1030    }
1031}
1032
1033fn api_to_wire(api: &crate::types::Api) -> wire::Api {
1034    wire::Api {
1035        api_id: Some(api.api_id.clone()),
1036        name: Some(api.name.clone()),
1037        api_arn: Some(api.api_arn.clone()),
1038        created: Some(api.created),
1039        tags: if api.tags.is_empty() {
1040            None
1041        } else {
1042            Some(api.tags.clone())
1043        },
1044        owner_contact: api.owner_contact.clone(),
1045        ..Default::default()
1046    }
1047}
1048
1049fn api_cache_to_wire(cache: &crate::types::ApiCacheEntry) -> wire::ApiCache {
1050    wire::ApiCache {
1051        api_caching_behavior: Some(cache.api_caching_behavior.clone()),
1052        r#type: Some(cache.r#type.clone()),
1053        ttl: Some(cache.ttl),
1054        at_rest_encryption_enabled: Some(cache.at_rest_encryption_enabled),
1055        transit_encryption_enabled: Some(cache.transit_encryption_enabled),
1056        status: Some(cache.status.clone()),
1057        health_metrics_config: cache.health_metrics_config.clone(),
1058    }
1059}
1060
1061fn api_key_to_wire(key: &crate::types::ApiKeyEntry) -> wire::ApiKey {
1062    wire::ApiKey {
1063        id: Some(key.id.clone()),
1064        description: key.description.clone(),
1065        expires: Some(key.expires),
1066        deletes: Some(key.deletes),
1067    }
1068}
1069
1070fn channel_namespace_to_wire(ns: &crate::types::ChannelNamespaceEntry) -> wire::ChannelNamespace {
1071    wire::ChannelNamespace {
1072        api_id: Some(ns.api_id.clone()),
1073        name: Some(ns.name.clone()),
1074        channel_namespace_arn: Some(ns.channel_namespace_arn.clone()),
1075        created: Some(ns.created),
1076        last_modified: Some(ns.last_modified),
1077        tags: if ns.tags.is_empty() {
1078            None
1079        } else {
1080            Some(ns.tags.clone())
1081        },
1082        ..Default::default()
1083    }
1084}
1085
1086fn type_to_wire(t: &crate::types::TypeEntry) -> wire::Type {
1087    wire::Type {
1088        name: Some(t.name.clone()),
1089        definition: t.definition.clone(),
1090        format: Some(t.format.clone()),
1091        arn: Some(t.arn.clone()),
1092        ..Default::default()
1093    }
1094}
1095
1096// ===================== Utility functions =====================
1097
1098fn extract_path_and_query(uri: &str) -> (String, String) {
1099    let relevant = if let Some(idx) = uri.find("amazonaws.com") {
1100        &uri[idx + "amazonaws.com".len()..]
1101    } else {
1102        uri
1103    };
1104    if let Some(q) = relevant.find('?') {
1105        (relevant[..q].to_string(), relevant[q + 1..].to_string())
1106    } else {
1107        (relevant.to_string(), String::new())
1108    }
1109}
1110
1111fn extract_query_list(query_string: &str, key: &str) -> Vec<String> {
1112    query_string
1113        .split('&')
1114        .filter_map(|pair| {
1115            let (k, v) = pair.split_once('=')?;
1116
1117            if k == key {
1118                let decoded = v
1119                    .replace("%20", " ")
1120                    .replace("%3A", ":")
1121                    .replace("%2F", "/")
1122                    .replace('+', " ");
1123                Some(decoded)
1124            } else {
1125                None
1126            }
1127        })
1128        .collect()
1129}
1130
1131fn percent_decode(s: &str) -> String {
1132    s.replace("%3A", ":")
1133        .replace("%2F", "/")
1134        .replace("%20", " ")
1135        .replace('+', " ")
1136}
1137
1138fn appsync_error_response(err: &AppSyncError) -> MockResponse {
1139    let (status, error_type) = match err {
1140        AppSyncError::GraphqlApiNotFound { .. } => (404u16, "NotFoundException"),
1141        AppSyncError::ApiNotFound { .. } => (404, "NotFoundException"),
1142        AppSyncError::ApiCacheNotFound { .. } => (404, "NotFoundException"),
1143        AppSyncError::ApiCacheAlreadyExists { .. } => (400, "BadRequestException"),
1144        AppSyncError::ApiKeyNotFound { .. } => (404, "NotFoundException"),
1145        AppSyncError::ChannelNamespaceNotFound { .. } => (404, "NotFoundException"),
1146        AppSyncError::SchemaNotFound { .. } => (404, "NotFoundException"),
1147        AppSyncError::TypeNotFound { .. } => (404, "NotFoundException"),
1148    };
1149    let body = json!({
1150        "Type": "User",
1151        "Message": err.to_string(),
1152    });
1153    let mut resp = MockResponse::rest_json(status, body.to_string());
1154    resp.headers
1155        .insert(X_AMZN_ERRORTYPE, error_type.parse().unwrap());
1156    resp
1157}
1158
1159fn rest_json_error(status: u16, code: &str, message: &str) -> MockResponse {
1160    let body = json!({
1161        "Type": "User",
1162        "Message": message,
1163    });
1164    let mut resp = MockResponse::rest_json(status, body.to_string());
1165    resp.headers.insert(X_AMZN_ERRORTYPE, code.parse().unwrap());
1166    resp
1167}