1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use http::header::HeaderName;
7use serde_json::json;
8use winterbaume_core::{
9 BackendState, MockRequest, MockResponse, MockService, StateChangeNotifier, StatefulService,
10 default_account_id,
11};
12
13use crate::state::{AppSyncError, AppSyncState};
14use crate::views::AppsyncStateView;
15use crate::wire;
16
17const X_AMZN_ERRORTYPE: HeaderName = HeaderName::from_static("x-amzn-errortype");
18
19pub struct AppSyncService {
20 pub(crate) state: Arc<BackendState<AppSyncState>>,
21 pub(crate) notifier: StateChangeNotifier<AppsyncStateView>,
22}
23
24impl AppSyncService {
25 pub fn new() -> Self {
26 Self {
27 state: Arc::new(BackendState::new()),
28 notifier: StateChangeNotifier::new(),
29 }
30 }
31}
32
33impl Default for AppSyncService {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl MockService for AppSyncService {
40 fn service_name(&self) -> &str {
41 "appsync"
42 }
43
44 fn url_patterns(&self) -> Vec<&str> {
45 vec![r"https?://appsync\.(.+)\.amazonaws\.com"]
46 }
47
48 fn handle(
49 &self,
50 request: MockRequest,
51 ) -> Pin<Box<dyn Future<Output = MockResponse> + Send + '_>> {
52 Box::pin(async move { self.dispatch(request).await })
53 }
54}
55
56impl AppSyncService {
57 async fn dispatch(&self, request: MockRequest) -> MockResponse {
58 let region = winterbaume_core::auth::extract_region_from_uri(&request.uri);
59 let account_id = default_account_id();
60 let state = self.state.get(account_id, ®ion);
61
62 let (path, query_string) = extract_path_and_query(&request.uri);
63 let method = request.method.as_str();
64
65 let segments: Vec<&str> = path.trim_start_matches('/').split('/').collect();
66 let query_map: HashMap<String, String> =
67 winterbaume_core::parse_query_string(&query_string);
68
69 let response = if segments.is_empty() {
70 rest_json_error(404, "UnknownOperationException", "Not found")
71 } else if segments.len() >= 3 && segments[0] == "v1" && segments[1] == "tags" {
72 let resource_arn = percent_decode(&segments[2..].join("/"));
74 let labels: &[(&str, &str)] = &[("resourceArn", resource_arn.as_str())];
75 match method {
76 "GET" => {
77 self.handle_list_tags_for_resource(&state, &request, labels, &query_map)
78 .await
79 }
80 "POST" => {
81 self.handle_tag_resource(&state, &request, labels, &query_map)
82 .await
83 }
84 "DELETE" => {
85 self.handle_untag_resource(&state, &request, labels, &query_map, &query_string)
86 .await
87 }
88 _ => rest_json_error(404, "UnknownOperationException", "Not found"),
89 }
90 } else if segments.len() >= 2 && segments[0] == "v2" && segments[1] == "apis" {
91 self.dispatch_v2(
93 method, &segments, &request, &query_map, account_id, ®ion, &state,
94 )
95 .await
96 } else if segments.len() >= 2 && segments[0] == "v1" && segments[1] == "apis" {
97 self.dispatch_v1(
99 method, &segments, &request, &query_map, account_id, ®ion, &state,
100 )
101 .await
102 } else {
103 rest_json_error(404, "UnknownOperationException", "Not found")
104 };
105
106 if matches!(method, "PUT" | "POST" | "DELETE") && response.status / 100 == 2 {
107 self.notify_state_changed(account_id, ®ion).await;
108 }
109 response
110 }
111
112 #[allow(clippy::too_many_arguments)]
113 async fn dispatch_v2(
114 &self,
115 method: &str,
116 segments: &[&str],
117 request: &MockRequest,
118 query_map: &HashMap<String, String>,
119 account_id: &str,
120 region: &str,
121 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
122 ) -> MockResponse {
123 match (method, segments.len()) {
125 ("POST", 2) => {
127 self.handle_create_api(state, request, &[], query_map, account_id, region)
128 .await
129 }
130 ("GET", 2) => self.handle_list_apis(state, request, &[], query_map).await,
132 ("GET", 3) => {
134 let labels: &[(&str, &str)] = &[("apiId", segments[2])];
135 self.handle_get_api(state, request, labels, query_map).await
136 }
137 ("DELETE", 3) => {
139 let labels: &[(&str, &str)] = &[("apiId", segments[2])];
140 self.handle_delete_api(state, request, labels, query_map)
141 .await
142 }
143 ("POST", 4) if segments[3] == "channelNamespaces" => {
145 let labels: &[(&str, &str)] = &[("apiId", segments[2])];
146 self.handle_create_channel_namespace(
147 state, request, labels, query_map, account_id, region,
148 )
149 .await
150 }
151 ("GET", 4) if segments[3] == "channelNamespaces" => {
153 let labels: &[(&str, &str)] = &[("apiId", segments[2])];
154 self.handle_list_channel_namespaces(state, request, labels, query_map)
155 .await
156 }
157 ("DELETE", 5) if segments[3] == "channelNamespaces" => {
159 let labels: &[(&str, &str)] = &[("apiId", segments[2]), ("name", segments[4])];
160 self.handle_delete_channel_namespace(state, request, labels, query_map)
161 .await
162 }
163 _ => rest_json_error(404, "UnknownOperationException", "Not found"),
164 }
165 }
166
167 #[allow(clippy::too_many_arguments)]
168 async fn dispatch_v1(
169 &self,
170 method: &str,
171 segments: &[&str],
172 request: &MockRequest,
173 query_map: &HashMap<String, String>,
174 account_id: &str,
175 region: &str,
176 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
177 ) -> MockResponse {
178 match (method, segments.len()) {
180 ("POST", 2) => {
182 self.handle_create_graphql_api(state, request, &[], query_map, account_id, region)
183 .await
184 }
185 ("GET", 2) => {
187 self.handle_list_graphql_apis(state, request, &[], query_map)
188 .await
189 }
190 ("GET", 3) => {
192 let labels: &[(&str, &str)] = &[("apiId", segments[2])];
193 self.handle_get_graphql_api(state, request, labels, query_map)
194 .await
195 }
196 ("POST", 3) => {
198 let labels: &[(&str, &str)] = &[("apiId", segments[2])];
199 self.handle_update_graphql_api(state, request, labels, query_map)
200 .await
201 }
202 ("DELETE", 3) => {
204 let labels: &[(&str, &str)] = &[("apiId", segments[2])];
205 self.handle_delete_graphql_api(state, request, labels, query_map)
206 .await
207 }
208 (_, 4) => {
210 let api_id = segments[2];
211 let sub = segments[3];
212 let labels_api: &[(&str, &str)] = &[("apiId", api_id)];
213 match (method, sub) {
214 ("POST", "ApiCaches") => {
216 self.handle_create_api_cache(state, request, labels_api, query_map)
217 .await
218 }
219 ("GET", "ApiCaches") => {
221 self.handle_get_api_cache(state, request, labels_api, query_map)
222 .await
223 }
224 ("DELETE", "ApiCaches") => {
226 self.handle_delete_api_cache(state, request, labels_api, query_map)
227 .await
228 }
229 ("POST", "apikeys") => {
231 self.handle_create_api_key(state, request, labels_api, query_map)
232 .await
233 }
234 ("GET", "apikeys") => {
236 self.handle_list_api_keys(state, request, labels_api, query_map)
237 .await
238 }
239 ("DELETE", "FlushCache") => {
241 self.handle_flush_api_cache(state, request, labels_api, query_map)
242 .await
243 }
244 ("POST", "schemacreation") => {
246 self.handle_start_schema_creation(state, request, labels_api, query_map)
247 .await
248 }
249 ("GET", "schemacreation") => {
251 self.handle_get_schema_creation_status(
252 state, request, labels_api, query_map,
253 )
254 .await
255 }
256 _ => rest_json_error(404, "UnknownOperationException", "Not found"),
257 }
258 }
259 (_, 5) => {
261 let api_id = segments[2];
262 let sub = segments[3];
263 let sub_id = segments[4];
264 match (method, sub) {
265 ("POST", "ApiCaches") if sub_id == "update" => {
267 let labels: &[(&str, &str)] = &[("apiId", api_id)];
268 self.handle_update_api_cache(state, request, labels, query_map)
269 .await
270 }
271 ("DELETE", "apikeys") => {
273 let labels: &[(&str, &str)] = &[("apiId", api_id), ("id", sub_id)];
274 self.handle_delete_api_key(state, request, labels, query_map)
275 .await
276 }
277 ("POST", "apikeys") => {
279 let labels: &[(&str, &str)] = &[("apiId", api_id), ("id", sub_id)];
280 self.handle_update_api_key(state, request, labels, query_map)
281 .await
282 }
283 ("GET", "types") => {
285 let labels: &[(&str, &str)] = &[("apiId", api_id), ("typeName", sub_id)];
286 self.handle_get_type(state, request, labels, query_map)
287 .await
288 }
289 _ => rest_json_error(404, "UnknownOperationException", "Not found"),
290 }
291 }
292 _ => rest_json_error(404, "UnknownOperationException", "Not found"),
293 }
294 }
295
296 async fn handle_create_graphql_api(
299 &self,
300 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
301 request: &MockRequest,
302 labels: &[(&str, &str)],
303 query: &HashMap<String, String>,
304 account_id: &str,
305 region: &str,
306 ) -> MockResponse {
307 let input = match wire::deserialize_create_graphql_api_request(request, labels, query) {
308 Ok(v) => v,
309 Err(e) => return rest_json_error(400, "ValidationException", &e),
310 };
311 if input.name.is_empty() {
312 return rest_json_error(400, "BadRequestException", "Missing 'name'");
313 }
314 if input.authentication_type.is_empty() {
315 return rest_json_error(400, "BadRequestException", "Missing 'authenticationType'");
316 }
317 let tags = input.tags.unwrap_or_default();
318
319 let mut s = state.write().await;
320 match s.create_graphql_api(
321 &input.name,
322 &input.authentication_type,
323 account_id,
324 region,
325 tags,
326 ) {
327 Ok(api) => {
328 let resp = wire::CreateGraphqlApiResponse {
329 graphql_api: Some(graphql_api_to_wire(api)),
330 };
331 wire::serialize_create_graphql_api_response(&resp)
332 }
333 Err(e) => appsync_error_response(&e),
334 }
335 }
336
337 async fn handle_get_graphql_api(
338 &self,
339 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
340 request: &MockRequest,
341 labels: &[(&str, &str)],
342 query: &HashMap<String, String>,
343 ) -> MockResponse {
344 let input = match wire::deserialize_get_graphql_api_request(request, labels, query) {
345 Ok(v) => v,
346 Err(e) => return rest_json_error(400, "ValidationException", &e),
347 };
348 let s = state.read().await;
349 match s.get_graphql_api(&input.api_id) {
350 Ok(api) => {
351 let resp = wire::GetGraphqlApiResponse {
352 graphql_api: Some(graphql_api_to_wire(api)),
353 };
354 wire::serialize_get_graphql_api_response(&resp)
355 }
356 Err(e) => appsync_error_response(&e),
357 }
358 }
359
360 async fn handle_delete_graphql_api(
361 &self,
362 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
363 request: &MockRequest,
364 labels: &[(&str, &str)],
365 query: &HashMap<String, String>,
366 ) -> MockResponse {
367 let input = match wire::deserialize_delete_graphql_api_request(request, labels, query) {
368 Ok(v) => v,
369 Err(e) => return rest_json_error(400, "ValidationException", &e),
370 };
371 let mut s = state.write().await;
372 match s.delete_graphql_api(&input.api_id) {
373 Ok(()) => {
374 let resp = wire::DeleteGraphqlApiResponse {};
375 wire::serialize_delete_graphql_api_response(&resp)
376 }
377 Err(e) => appsync_error_response(&e),
378 }
379 }
380
381 async fn handle_list_graphql_apis(
382 &self,
383 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
384 request: &MockRequest,
385 labels: &[(&str, &str)],
386 query: &HashMap<String, String>,
387 ) -> MockResponse {
388 let _input = match wire::deserialize_list_graphql_apis_request(request, labels, query) {
389 Ok(v) => v,
390 Err(e) => return rest_json_error(400, "ValidationException", &e),
391 };
392 let s = state.read().await;
393 let apis = s.list_graphql_apis();
394 let entries: Vec<wire::GraphqlApi> =
395 apis.iter().map(|api| graphql_api_to_wire(api)).collect();
396 let resp = wire::ListGraphqlApisResponse {
397 graphql_apis: Some(entries),
398 next_token: None,
399 };
400 wire::serialize_list_graphql_apis_response(&resp)
401 }
402
403 async fn handle_update_graphql_api(
404 &self,
405 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
406 request: &MockRequest,
407 labels: &[(&str, &str)],
408 query: &HashMap<String, String>,
409 ) -> MockResponse {
410 let input = match wire::deserialize_update_graphql_api_request(request, labels, query) {
411 Ok(v) => v,
412 Err(e) => return rest_json_error(400, "ValidationException", &e),
413 };
414 let name = if input.name.is_empty() {
415 None
416 } else {
417 Some(input.name.as_str())
418 };
419 let authentication_type = if input.authentication_type.is_empty() {
420 None
421 } else {
422 Some(input.authentication_type.as_str())
423 };
424
425 let mut s = state.write().await;
426 match s.update_graphql_api(&input.api_id, name, authentication_type) {
427 Ok(api) => {
428 let resp = wire::UpdateGraphqlApiResponse {
429 graphql_api: Some(graphql_api_to_wire(api)),
430 };
431 wire::serialize_update_graphql_api_response(&resp)
432 }
433 Err(e) => appsync_error_response(&e),
434 }
435 }
436
437 async fn handle_create_api(
440 &self,
441 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
442 request: &MockRequest,
443 labels: &[(&str, &str)],
444 query: &HashMap<String, String>,
445 account_id: &str,
446 region: &str,
447 ) -> MockResponse {
448 let input = match wire::deserialize_create_api_request(request, labels, query) {
449 Ok(v) => v,
450 Err(e) => return rest_json_error(400, "ValidationException", &e),
451 };
452 if input.name.is_empty() {
453 return rest_json_error(400, "BadRequestException", "Missing 'name'");
454 }
455 let owner_contact = input.owner_contact.as_deref();
456 let tags = input.tags.unwrap_or_default();
457
458 let mut s = state.write().await;
459 match s.create_api(&input.name, account_id, region, owner_contact, tags) {
460 Ok(api) => {
461 let resp = wire::CreateApiResponse {
462 api: Some(api_to_wire(api)),
463 };
464 wire::serialize_create_api_response(&resp)
465 }
466 Err(e) => appsync_error_response(&e),
467 }
468 }
469
470 async fn handle_get_api(
471 &self,
472 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
473 request: &MockRequest,
474 labels: &[(&str, &str)],
475 query: &HashMap<String, String>,
476 ) -> MockResponse {
477 let input = match wire::deserialize_get_api_request(request, labels, query) {
478 Ok(v) => v,
479 Err(e) => return rest_json_error(400, "ValidationException", &e),
480 };
481 let s = state.read().await;
482 match s.get_api(&input.api_id) {
483 Ok(api) => {
484 let resp = wire::GetApiResponse {
485 api: Some(api_to_wire(api)),
486 };
487 wire::serialize_get_api_response(&resp)
488 }
489 Err(e) => appsync_error_response(&e),
490 }
491 }
492
493 async fn handle_delete_api(
494 &self,
495 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
496 request: &MockRequest,
497 labels: &[(&str, &str)],
498 query: &HashMap<String, String>,
499 ) -> MockResponse {
500 let input = match wire::deserialize_delete_api_request(request, labels, query) {
501 Ok(v) => v,
502 Err(e) => return rest_json_error(400, "ValidationException", &e),
503 };
504 let mut s = state.write().await;
505 match s.delete_api(&input.api_id) {
506 Ok(()) => {
507 let resp = wire::DeleteApiResponse {};
508 wire::serialize_delete_api_response(&resp)
509 }
510 Err(e) => appsync_error_response(&e),
511 }
512 }
513
514 async fn handle_list_apis(
515 &self,
516 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
517 request: &MockRequest,
518 labels: &[(&str, &str)],
519 query: &HashMap<String, String>,
520 ) -> MockResponse {
521 let _input = match wire::deserialize_list_apis_request(request, labels, query) {
522 Ok(v) => v,
523 Err(e) => return rest_json_error(400, "ValidationException", &e),
524 };
525 let s = state.read().await;
526 let apis = s.list_apis();
527 let entries: Vec<wire::Api> = apis.iter().map(|api| api_to_wire(api)).collect();
528 let resp = wire::ListApisResponse {
529 apis: Some(entries),
530 next_token: None,
531 };
532 wire::serialize_list_apis_response(&resp)
533 }
534
535 async fn handle_create_api_cache(
538 &self,
539 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
540 request: &MockRequest,
541 labels: &[(&str, &str)],
542 query: &HashMap<String, String>,
543 ) -> MockResponse {
544 let input = match wire::deserialize_create_api_cache_request(request, labels, query) {
545 Ok(v) => v,
546 Err(e) => return rest_json_error(400, "ValidationException", &e),
547 };
548 if input.api_caching_behavior.is_empty() {
549 return rest_json_error(400, "BadRequestException", "Missing 'apiCachingBehavior'");
550 }
551 if input.r#type.is_empty() {
552 return rest_json_error(400, "BadRequestException", "Missing 'type'");
553 }
554 let ttl = if input.ttl == 0 { 3600 } else { input.ttl };
555 let at_rest = input.at_rest_encryption_enabled.unwrap_or(false);
556 let transit = input.transit_encryption_enabled.unwrap_or(false);
557 let health_metrics = input.health_metrics_config.as_deref();
558
559 let mut s = state.write().await;
560 match s.create_api_cache(
561 &input.api_id,
562 &input.api_caching_behavior,
563 &input.r#type,
564 ttl,
565 at_rest,
566 transit,
567 health_metrics,
568 ) {
569 Ok(cache) => {
570 let resp = wire::CreateApiCacheResponse {
571 api_cache: Some(api_cache_to_wire(cache)),
572 };
573 wire::serialize_create_api_cache_response(&resp)
574 }
575 Err(e) => appsync_error_response(&e),
576 }
577 }
578
579 async fn handle_get_api_cache(
580 &self,
581 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
582 request: &MockRequest,
583 labels: &[(&str, &str)],
584 query: &HashMap<String, String>,
585 ) -> MockResponse {
586 let input = match wire::deserialize_get_api_cache_request(request, labels, query) {
587 Ok(v) => v,
588 Err(e) => return rest_json_error(400, "ValidationException", &e),
589 };
590 let s = state.read().await;
591 match s.get_api_cache(&input.api_id) {
592 Ok(cache) => {
593 let resp = wire::GetApiCacheResponse {
594 api_cache: Some(api_cache_to_wire(cache)),
595 };
596 wire::serialize_get_api_cache_response(&resp)
597 }
598 Err(e) => appsync_error_response(&e),
599 }
600 }
601
602 async fn handle_delete_api_cache(
603 &self,
604 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
605 request: &MockRequest,
606 labels: &[(&str, &str)],
607 query: &HashMap<String, String>,
608 ) -> MockResponse {
609 let input = match wire::deserialize_delete_api_cache_request(request, labels, query) {
610 Ok(v) => v,
611 Err(e) => return rest_json_error(400, "ValidationException", &e),
612 };
613 let mut s = state.write().await;
614 match s.delete_api_cache(&input.api_id) {
615 Ok(()) => {
616 let resp = wire::DeleteApiCacheResponse {};
617 wire::serialize_delete_api_cache_response(&resp)
618 }
619 Err(e) => appsync_error_response(&e),
620 }
621 }
622
623 async fn handle_update_api_cache(
624 &self,
625 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
626 request: &MockRequest,
627 labels: &[(&str, &str)],
628 query: &HashMap<String, String>,
629 ) -> MockResponse {
630 let input = match wire::deserialize_update_api_cache_request(request, labels, query) {
631 Ok(v) => v,
632 Err(e) => return rest_json_error(400, "ValidationException", &e),
633 };
634 if input.api_caching_behavior.is_empty() {
635 return rest_json_error(400, "BadRequestException", "Missing 'apiCachingBehavior'");
636 }
637 if input.r#type.is_empty() {
638 return rest_json_error(400, "BadRequestException", "Missing 'type'");
639 }
640 let ttl = if input.ttl == 0 { 3600 } else { input.ttl };
641 let health_metrics = input.health_metrics_config.as_deref();
642
643 let mut s = state.write().await;
644 match s.update_api_cache(
645 &input.api_id,
646 &input.api_caching_behavior,
647 &input.r#type,
648 ttl,
649 health_metrics,
650 ) {
651 Ok(cache) => {
652 let resp = wire::UpdateApiCacheResponse {
653 api_cache: Some(api_cache_to_wire(cache)),
654 };
655 wire::serialize_update_api_cache_response(&resp)
656 }
657 Err(e) => appsync_error_response(&e),
658 }
659 }
660
661 async fn handle_flush_api_cache(
662 &self,
663 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
664 request: &MockRequest,
665 labels: &[(&str, &str)],
666 query: &HashMap<String, String>,
667 ) -> MockResponse {
668 let input = match wire::deserialize_flush_api_cache_request(request, labels, query) {
669 Ok(v) => v,
670 Err(e) => return rest_json_error(400, "ValidationException", &e),
671 };
672 let s = state.read().await;
673 match s.flush_api_cache(&input.api_id) {
674 Ok(()) => {
675 let resp = wire::FlushApiCacheResponse {};
676 wire::serialize_flush_api_cache_response(&resp)
677 }
678 Err(e) => appsync_error_response(&e),
679 }
680 }
681
682 async fn handle_create_api_key(
685 &self,
686 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
687 request: &MockRequest,
688 labels: &[(&str, &str)],
689 query: &HashMap<String, String>,
690 ) -> MockResponse {
691 let input = match wire::deserialize_create_api_key_request(request, labels, query) {
692 Ok(v) => v,
693 Err(e) => return rest_json_error(400, "ValidationException", &e),
694 };
695 let description = input.description.as_deref();
696 let expires = input.expires.unwrap_or_else(|| {
697 let now = std::time::SystemTime::now()
698 .duration_since(std::time::UNIX_EPOCH)
699 .unwrap_or_default()
700 .as_secs() as i64;
701 now + 7 * 24 * 60 * 60 });
703
704 let mut s = state.write().await;
705 match s.create_api_key(&input.api_id, description, expires) {
706 Ok(key) => {
707 let resp = wire::CreateApiKeyResponse {
708 api_key: Some(api_key_to_wire(key)),
709 };
710 wire::serialize_create_api_key_response(&resp)
711 }
712 Err(e) => appsync_error_response(&e),
713 }
714 }
715
716 async fn handle_delete_api_key(
717 &self,
718 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
719 request: &MockRequest,
720 labels: &[(&str, &str)],
721 query: &HashMap<String, String>,
722 ) -> MockResponse {
723 let input = match wire::deserialize_delete_api_key_request(request, labels, query) {
724 Ok(v) => v,
725 Err(e) => return rest_json_error(400, "ValidationException", &e),
726 };
727 let mut s = state.write().await;
728 match s.delete_api_key(&input.api_id, &input.id) {
729 Ok(()) => {
730 let resp = wire::DeleteApiKeyResponse {};
731 wire::serialize_delete_api_key_response(&resp)
732 }
733 Err(e) => appsync_error_response(&e),
734 }
735 }
736
737 async fn handle_list_api_keys(
738 &self,
739 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
740 request: &MockRequest,
741 labels: &[(&str, &str)],
742 query: &HashMap<String, String>,
743 ) -> MockResponse {
744 let input = match wire::deserialize_list_api_keys_request(request, labels, query) {
745 Ok(v) => v,
746 Err(e) => return rest_json_error(400, "ValidationException", &e),
747 };
748 let s = state.read().await;
749 match s.list_api_keys(&input.api_id) {
750 Ok(keys) => {
751 let entries: Vec<wire::ApiKey> = keys.iter().map(|k| api_key_to_wire(k)).collect();
752 let resp = wire::ListApiKeysResponse {
753 api_keys: Some(entries),
754 next_token: None,
755 };
756 wire::serialize_list_api_keys_response(&resp)
757 }
758 Err(e) => appsync_error_response(&e),
759 }
760 }
761
762 async fn handle_update_api_key(
763 &self,
764 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
765 request: &MockRequest,
766 labels: &[(&str, &str)],
767 query: &HashMap<String, String>,
768 ) -> MockResponse {
769 let input = match wire::deserialize_update_api_key_request(request, labels, query) {
770 Ok(v) => v,
771 Err(e) => return rest_json_error(400, "ValidationException", &e),
772 };
773 let description = input.description.as_deref();
774 let expires = input.expires;
775
776 let mut s = state.write().await;
777 match s.update_api_key(&input.api_id, &input.id, description, expires) {
778 Ok(key) => {
779 let resp = wire::UpdateApiKeyResponse {
780 api_key: Some(api_key_to_wire(key)),
781 };
782 wire::serialize_update_api_key_response(&resp)
783 }
784 Err(e) => appsync_error_response(&e),
785 }
786 }
787
788 async fn handle_create_channel_namespace(
791 &self,
792 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
793 request: &MockRequest,
794 labels: &[(&str, &str)],
795 query: &HashMap<String, String>,
796 account_id: &str,
797 region: &str,
798 ) -> MockResponse {
799 let input = match wire::deserialize_create_channel_namespace_request(request, labels, query)
800 {
801 Ok(v) => v,
802 Err(e) => return rest_json_error(400, "ValidationException", &e),
803 };
804 if input.name.is_empty() {
805 return rest_json_error(400, "BadRequestException", "Missing 'name'");
806 }
807 let tags = input.tags.unwrap_or_default();
808
809 let mut s = state.write().await;
810 match s.create_channel_namespace(&input.api_id, &input.name, account_id, region, tags) {
811 Ok(ns) => {
812 let resp = wire::CreateChannelNamespaceResponse {
813 channel_namespace: Some(channel_namespace_to_wire(ns)),
814 };
815 wire::serialize_create_channel_namespace_response(&resp)
816 }
817 Err(e) => appsync_error_response(&e),
818 }
819 }
820
821 async fn handle_delete_channel_namespace(
822 &self,
823 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
824 request: &MockRequest,
825 labels: &[(&str, &str)],
826 query: &HashMap<String, String>,
827 ) -> MockResponse {
828 let input = match wire::deserialize_delete_channel_namespace_request(request, labels, query)
829 {
830 Ok(v) => v,
831 Err(e) => return rest_json_error(400, "ValidationException", &e),
832 };
833 let mut s = state.write().await;
834 match s.delete_channel_namespace(&input.api_id, &input.name) {
835 Ok(()) => {
836 let resp = wire::DeleteChannelNamespaceResponse {};
837 wire::serialize_delete_channel_namespace_response(&resp)
838 }
839 Err(e) => appsync_error_response(&e),
840 }
841 }
842
843 async fn handle_list_channel_namespaces(
844 &self,
845 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
846 request: &MockRequest,
847 labels: &[(&str, &str)],
848 query: &HashMap<String, String>,
849 ) -> MockResponse {
850 let input = match wire::deserialize_list_channel_namespaces_request(request, labels, query)
851 {
852 Ok(v) => v,
853 Err(e) => return rest_json_error(400, "ValidationException", &e),
854 };
855 let s = state.read().await;
856 match s.list_channel_namespaces(&input.api_id) {
857 Ok(nss) => {
858 let entries: Vec<wire::ChannelNamespace> =
859 nss.iter().map(|ns| channel_namespace_to_wire(ns)).collect();
860 let resp = wire::ListChannelNamespacesResponse {
861 channel_namespaces: Some(entries),
862 next_token: None,
863 };
864 wire::serialize_list_channel_namespaces_response(&resp)
865 }
866 Err(e) => appsync_error_response(&e),
867 }
868 }
869
870 async fn handle_start_schema_creation(
873 &self,
874 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
875 request: &MockRequest,
876 labels: &[(&str, &str)],
877 query: &HashMap<String, String>,
878 ) -> MockResponse {
879 let input = match wire::deserialize_start_schema_creation_request(request, labels, query) {
880 Ok(v) => v,
881 Err(e) => return rest_json_error(400, "ValidationException", &e),
882 };
883
884 let mut s = state.write().await;
885 match s.start_schema_creation(&input.api_id, input.definition.as_bytes()) {
886 Ok(status) => {
887 let resp = wire::StartSchemaCreationResponse {
888 status: Some(status.status.clone()),
889 };
890 wire::serialize_start_schema_creation_response(&resp)
891 }
892 Err(e) => appsync_error_response(&e),
893 }
894 }
895
896 async fn handle_get_schema_creation_status(
897 &self,
898 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
899 request: &MockRequest,
900 labels: &[(&str, &str)],
901 query: &HashMap<String, String>,
902 ) -> MockResponse {
903 let input =
904 match wire::deserialize_get_schema_creation_status_request(request, labels, query) {
905 Ok(v) => v,
906 Err(e) => return rest_json_error(400, "ValidationException", &e),
907 };
908 let s = state.read().await;
909 match s.get_schema_creation_status(&input.api_id) {
910 Ok(status) => {
911 let resp = wire::GetSchemaCreationStatusResponse {
912 status: Some(status.status.clone()),
913 details: status.details.clone(),
914 };
915 wire::serialize_get_schema_creation_status_response(&resp)
916 }
917 Err(e) => appsync_error_response(&e),
918 }
919 }
920
921 async fn handle_get_type(
924 &self,
925 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
926 request: &MockRequest,
927 labels: &[(&str, &str)],
928 query: &HashMap<String, String>,
929 ) -> MockResponse {
930 let input = match wire::deserialize_get_type_request(request, labels, query) {
931 Ok(v) => v,
932 Err(e) => return rest_json_error(400, "ValidationException", &e),
933 };
934 let format = if input.format.is_empty() {
935 "SDL"
936 } else {
937 input.format.as_str()
938 };
939 let s = state.read().await;
940 match s.get_type(&input.api_id, &input.type_name, format) {
941 Ok(t) => {
942 let resp = wire::GetTypeResponse {
943 r#type: Some(type_to_wire(t)),
944 };
945 wire::serialize_get_type_response(&resp)
946 }
947 Err(e) => appsync_error_response(&e),
948 }
949 }
950
951 async fn handle_list_tags_for_resource(
954 &self,
955 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
956 request: &MockRequest,
957 labels: &[(&str, &str)],
958 query: &HashMap<String, String>,
959 ) -> MockResponse {
960 let input = match wire::deserialize_list_tags_for_resource_request(request, labels, query) {
961 Ok(v) => v,
962 Err(e) => return rest_json_error(400, "ValidationException", &e),
963 };
964 let s = state.read().await;
965 let tags = s.list_tags_for_resource(&input.resource_arn);
966 wire::serialize_list_tags_for_resource_response(&wire::ListTagsForResourceResponse {
967 tags: Some(tags),
968 })
969 }
970
971 async fn handle_tag_resource(
972 &self,
973 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
974 request: &MockRequest,
975 labels: &[(&str, &str)],
976 query: &HashMap<String, String>,
977 ) -> MockResponse {
978 let input = match wire::deserialize_tag_resource_request(request, labels, query) {
979 Ok(v) => v,
980 Err(e) => return rest_json_error(400, "ValidationException", &e),
981 };
982 let mut s = state.write().await;
983 s.tag_resource(&input.resource_arn, input.tags);
984 wire::serialize_tag_resource_response(&wire::TagResourceResponse {})
985 }
986
987 async fn handle_untag_resource(
988 &self,
989 state: &Arc<tokio::sync::RwLock<AppSyncState>>,
990 request: &MockRequest,
991 labels: &[(&str, &str)],
992 query: &HashMap<String, String>,
993 raw_query_string: &str,
994 ) -> MockResponse {
995 let input = match wire::deserialize_untag_resource_request(request, labels, query) {
996 Ok(v) => v,
997 Err(e) => return rest_json_error(400, "ValidationException", &e),
998 };
999 let tag_keys = extract_query_list(raw_query_string, "tagKeys");
1004 let tag_keys = if tag_keys.is_empty() {
1005 input.tag_keys
1006 } else {
1007 tag_keys
1008 };
1009 let mut s = state.write().await;
1010 s.untag_resource(&input.resource_arn, &tag_keys);
1011 wire::serialize_untag_resource_response(&wire::UntagResourceResponse {})
1012 }
1013}
1014
1015fn graphql_api_to_wire(api: &crate::types::GraphqlApi) -> wire::GraphqlApi {
1018 wire::GraphqlApi {
1019 api_id: Some(api.api_id.clone()),
1020 name: Some(api.name.clone()),
1021 authentication_type: Some(api.authentication_type.clone()),
1022 arn: Some(api.arn.clone()),
1023 uris: Some(api.uris.clone()),
1024 tags: if api.tags.is_empty() {
1025 None
1026 } else {
1027 Some(api.tags.clone())
1028 },
1029 ..Default::default()
1030 }
1031}
1032
1033fn api_to_wire(api: &crate::types::Api) -> wire::Api {
1034 wire::Api {
1035 api_id: Some(api.api_id.clone()),
1036 name: Some(api.name.clone()),
1037 api_arn: Some(api.api_arn.clone()),
1038 created: Some(api.created),
1039 tags: if api.tags.is_empty() {
1040 None
1041 } else {
1042 Some(api.tags.clone())
1043 },
1044 owner_contact: api.owner_contact.clone(),
1045 ..Default::default()
1046 }
1047}
1048
1049fn api_cache_to_wire(cache: &crate::types::ApiCacheEntry) -> wire::ApiCache {
1050 wire::ApiCache {
1051 api_caching_behavior: Some(cache.api_caching_behavior.clone()),
1052 r#type: Some(cache.r#type.clone()),
1053 ttl: Some(cache.ttl),
1054 at_rest_encryption_enabled: Some(cache.at_rest_encryption_enabled),
1055 transit_encryption_enabled: Some(cache.transit_encryption_enabled),
1056 status: Some(cache.status.clone()),
1057 health_metrics_config: cache.health_metrics_config.clone(),
1058 }
1059}
1060
1061fn api_key_to_wire(key: &crate::types::ApiKeyEntry) -> wire::ApiKey {
1062 wire::ApiKey {
1063 id: Some(key.id.clone()),
1064 description: key.description.clone(),
1065 expires: Some(key.expires),
1066 deletes: Some(key.deletes),
1067 }
1068}
1069
1070fn channel_namespace_to_wire(ns: &crate::types::ChannelNamespaceEntry) -> wire::ChannelNamespace {
1071 wire::ChannelNamespace {
1072 api_id: Some(ns.api_id.clone()),
1073 name: Some(ns.name.clone()),
1074 channel_namespace_arn: Some(ns.channel_namespace_arn.clone()),
1075 created: Some(ns.created),
1076 last_modified: Some(ns.last_modified),
1077 tags: if ns.tags.is_empty() {
1078 None
1079 } else {
1080 Some(ns.tags.clone())
1081 },
1082 ..Default::default()
1083 }
1084}
1085
1086fn type_to_wire(t: &crate::types::TypeEntry) -> wire::Type {
1087 wire::Type {
1088 name: Some(t.name.clone()),
1089 definition: t.definition.clone(),
1090 format: Some(t.format.clone()),
1091 arn: Some(t.arn.clone()),
1092 ..Default::default()
1093 }
1094}
1095
1096fn extract_path_and_query(uri: &str) -> (String, String) {
1099 let relevant = if let Some(idx) = uri.find("amazonaws.com") {
1100 &uri[idx + "amazonaws.com".len()..]
1101 } else {
1102 uri
1103 };
1104 if let Some(q) = relevant.find('?') {
1105 (relevant[..q].to_string(), relevant[q + 1..].to_string())
1106 } else {
1107 (relevant.to_string(), String::new())
1108 }
1109}
1110
1111fn extract_query_list(query_string: &str, key: &str) -> Vec<String> {
1112 query_string
1113 .split('&')
1114 .filter_map(|pair| {
1115 let (k, v) = pair.split_once('=')?;
1116
1117 if k == key {
1118 let decoded = v
1119 .replace("%20", " ")
1120 .replace("%3A", ":")
1121 .replace("%2F", "/")
1122 .replace('+', " ");
1123 Some(decoded)
1124 } else {
1125 None
1126 }
1127 })
1128 .collect()
1129}
1130
1131fn percent_decode(s: &str) -> String {
1132 s.replace("%3A", ":")
1133 .replace("%2F", "/")
1134 .replace("%20", " ")
1135 .replace('+', " ")
1136}
1137
1138fn appsync_error_response(err: &AppSyncError) -> MockResponse {
1139 let (status, error_type) = match err {
1140 AppSyncError::GraphqlApiNotFound { .. } => (404u16, "NotFoundException"),
1141 AppSyncError::ApiNotFound { .. } => (404, "NotFoundException"),
1142 AppSyncError::ApiCacheNotFound { .. } => (404, "NotFoundException"),
1143 AppSyncError::ApiCacheAlreadyExists { .. } => (400, "BadRequestException"),
1144 AppSyncError::ApiKeyNotFound { .. } => (404, "NotFoundException"),
1145 AppSyncError::ChannelNamespaceNotFound { .. } => (404, "NotFoundException"),
1146 AppSyncError::SchemaNotFound { .. } => (404, "NotFoundException"),
1147 AppSyncError::TypeNotFound { .. } => (404, "NotFoundException"),
1148 };
1149 let body = json!({
1150 "Type": "User",
1151 "Message": err.to_string(),
1152 });
1153 let mut resp = MockResponse::rest_json(status, body.to_string());
1154 resp.headers
1155 .insert(X_AMZN_ERRORTYPE, error_type.parse().unwrap());
1156 resp
1157}
1158
1159fn rest_json_error(status: u16, code: &str, message: &str) -> MockResponse {
1160 let body = json!({
1161 "Type": "User",
1162 "Message": message,
1163 });
1164 let mut resp = MockResponse::rest_json(status, body.to_string());
1165 resp.headers.insert(X_AMZN_ERRORTYPE, code.parse().unwrap());
1166 resp
1167}