1use apollo_compiler::ExecutableDocument;
2use apollo_compiler::Name;
3use apollo_compiler::collections::HashMap;
4use apollo_compiler::collections::IndexMap;
5use apollo_compiler::collections::IndexSet;
6use apollo_compiler::executable::Selection;
7use apollo_compiler::executable::SelectionSet;
8use encoding_rs::Encoding;
9use encoding_rs::UTF_8;
10use http::HeaderMap;
11use http::HeaderValue;
12use http::header::CONTENT_LENGTH;
13use http::header::CONTENT_TYPE;
14use http::response::Parts;
15use itertools::Itertools;
16use mime::Mime;
17use serde_json_bytes::ByteString;
18use serde_json_bytes::Map;
19use serde_json_bytes::Value;
20
21use crate::connectors::Connector;
22use crate::connectors::JSONSelection;
23use crate::connectors::ProblemLocation;
24use crate::connectors::runtime::errors::RuntimeError;
25use crate::connectors::runtime::inputs::ContextReader;
26use crate::connectors::runtime::key::ResponseKey;
27use crate::connectors::runtime::mapping::Problem;
28use crate::connectors::runtime::mapping::aggregate_apply_to_errors;
29use crate::connectors::runtime::responses::DeserializeError::ContentDecoding;
30
31const ENTITIES: &str = "_entities";
32const TYPENAME: &str = "__typename";
33
34#[derive(Debug, thiserror::Error)]
35pub enum HandleResponseError {
36 #[error("Merge error: {0}")]
37 MergeError(String),
38}
39
40pub fn deserialize_response(body: &[u8], headers: &HeaderMap) -> Result<Value, DeserializeError> {
42 if headers
44 .get(CONTENT_LENGTH)
45 .and_then(|len| len.to_str().ok())
46 .and_then(|s| s.parse::<usize>().ok())
47 .is_some_and(|content_length| content_length == 0)
48 {
49 return Ok(Value::Null);
50 }
51
52 let content_type = headers
53 .get(CONTENT_TYPE)
54 .and_then(|h| h.to_str().ok()?.parse::<Mime>().ok());
55
56 if content_type.is_none()
57 || content_type
58 .as_ref()
59 .is_some_and(|ct| ct.subtype() == mime::JSON || ct.suffix() == Some(mime::JSON))
60 {
61 serde_json::from_slice::<Value>(body).map_err(DeserializeError::SerdeJson)
64 } else if content_type
65 .as_ref()
66 .is_some_and(|ct| ct.type_() == mime::TEXT && ct.subtype() == mime::PLAIN)
67 {
68 let encoding = content_type
71 .as_ref()
72 .and_then(|ct| Encoding::for_label(ct.get_param("charset")?.as_str().as_bytes()))
73 .unwrap_or(UTF_8);
74 let (decoded_body, _, had_errors) = encoding.decode(body);
75
76 if had_errors {
77 return Err(ContentDecoding(encoding.name()));
78 }
79
80 Ok(Value::String(decoded_body.into_owned().into()))
81 } else {
82 Ok(Value::Null)
84 }
85}
86
87#[derive(Debug, thiserror::Error)]
88pub enum DeserializeError {
89 #[error("Could not parse JSON: {0}")]
90 SerdeJson(#[source] serde_json::Error),
91 #[error("Could not decode data with content encoding {0}")]
92 ContentDecoding(&'static str),
93}
94
95pub fn handle_raw_response(
96 data: &Value,
97 parts: &Parts,
98 key: ResponseKey,
99 connector: &Connector,
100 context: impl ContextReader,
101 client_headers: &HeaderMap<HeaderValue>,
102) -> MappedResponse {
103 let inputs = key
104 .inputs()
105 .clone()
106 .merger(&connector.response_variable_keys)
107 .config(connector.config.as_ref())
108 .context(context)
109 .status(parts.status.as_u16())
110 .request(&connector.response_headers, client_headers)
111 .response(&connector.response_headers, Some(parts))
112 .merge();
113 let warnings = Vec::new();
114 let (success, warnings) = is_success(connector, data, parts, &inputs, warnings);
115 if success {
116 map_response(data, key, inputs, warnings)
117 } else {
118 map_error(connector, data, parts, key, inputs, warnings)
119 }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123struct GraphQLDataMapper<'a> {
124 doc: &'a ExecutableDocument,
125 subtypes_map: &'a IndexMap<String, IndexSet<String>>,
126}
127
128impl<'a> GraphQLDataMapper<'a> {
129 fn new(
130 doc: &'a ExecutableDocument,
131 subtypes_map: &'a IndexMap<String, IndexSet<String>>,
132 ) -> Self {
133 Self { doc, subtypes_map }
134 }
135
136 fn fragment_matches(&self, data: &Value, fragment_type_condition: &Name) -> bool {
137 if let Some(data_typename) = data.get("__typename") {
138 match data_typename {
139 Value::String(typename) => {
140 self.supertype_has_subtype(fragment_type_condition.as_str(), typename.as_str())
141 }
142 _ => false,
143 }
144 } else {
145 true
146 }
147 }
148
149 fn supertype_has_subtype(&self, supertype: &str, subtype: &str) -> bool {
150 if supertype == subtype {
151 true
152 } else if let Some(subtypes) = self.subtypes_map.get(supertype) {
153 subtypes
154 .iter()
155 .any(|s| self.supertype_has_subtype(s, subtype))
156 } else {
157 false
158 }
159 }
160
161 fn map_data(&self, data: &Value, selection_set: &SelectionSet) -> Value {
162 if selection_set.selections.is_empty() {
163 return data.clone();
164 }
165
166 match data {
167 Value::Object(map) => {
168 let mut new_map = Map::new();
169
170 for field in selection_set.selections.iter() {
171 match field {
172 Selection::Field(field) => {
173 if let Some(field_value) = map.get(field.name.as_str()) {
174 let output_field_name = field.alias.as_ref().unwrap_or(&field.name);
175 new_map.insert(
176 output_field_name.to_string(),
177 self.map_data(field_value, &field.selection_set),
178 );
179 }
180 }
181
182 Selection::FragmentSpread(spread) => {
183 if let Some(fragment) =
184 self.doc.fragments.get(spread.fragment_name.as_str())
185 && self.fragment_matches(data, fragment.type_condition())
186 {
187 let mapped = self.map_data(data, &fragment.selection_set);
188 if let Some(fragment_map) = mapped.as_object() {
189 new_map.extend(fragment_map.clone());
190 }
191 }
192 }
193
194 Selection::InlineFragment(fragment) => {
195 if let Some(type_condition) = &fragment.type_condition
196 && !self.fragment_matches(data, type_condition)
197 {
198 continue;
199 }
200 let mapped = self.map_data(data, &fragment.selection_set);
201 if let Some(fragment_map) = mapped.as_object() {
202 new_map.extend(fragment_map.clone());
203 }
204 }
205 }
206 }
207
208 Value::Object(new_map)
209 }
210
211 Value::Array(items) => Value::Array(
212 items
213 .iter()
214 .map(|item| self.map_data(item, selection_set))
215 .collect(),
216 ),
217
218 primitive => primitive.clone(),
219 }
220 }
221}
222
223fn is_success(
226 connector: &Connector,
227 data: &Value,
228 parts: &Parts,
229 inputs: &IndexMap<String, Value>,
230 mut warnings: Vec<Problem>,
231) -> (bool, Vec<Problem>) {
232 let Some(is_success_selection) = &connector.error_settings.connect_is_success else {
233 return (parts.status.is_success(), warnings);
234 };
235 let (res, apply_to_errors) = is_success_selection.apply_with_vars(data, inputs);
236 warnings.extend(aggregate_apply_to_errors(
237 apply_to_errors,
238 ProblemLocation::IsSuccess,
239 ));
240
241 (
242 res.as_ref().and_then(Value::as_bool).unwrap_or_default(),
243 warnings,
244 )
245}
246
247pub(super) fn map_response(
249 data: &Value,
250 key: ResponseKey,
251 inputs: IndexMap<String, Value>,
252 mut warnings: Vec<Problem>,
253) -> MappedResponse {
254 let (res, apply_to_errors) = key.selection().apply_with_vars(data, &inputs);
255 warnings.extend(aggregate_apply_to_errors(
256 apply_to_errors,
257 ProblemLocation::Selection,
258 ));
259 MappedResponse::Data {
260 key,
261 data: res.unwrap_or_else(|| Value::Null),
262 problems: warnings,
263 }
264}
265
266pub(super) fn map_error(
268 connector: &Connector,
269 data: &Value,
270 parts: &Parts,
271 key: ResponseKey,
272 inputs: IndexMap<String, Value>,
273 mut warnings: Vec<Problem>,
274) -> MappedResponse {
275 let message = if let Some(message_selection) = &connector.error_settings.message {
277 let (res, apply_to_errors) = message_selection.apply_with_vars(data, &inputs);
278 warnings.extend(aggregate_apply_to_errors(
279 apply_to_errors,
280 ProblemLocation::ErrorsMessage,
281 ));
282 res.as_ref()
283 .and_then(Value::as_str)
284 .unwrap_or_default()
285 .to_string()
286 } else {
287 "Request failed".to_string()
288 };
289
290 let mut error = RuntimeError::new(message, &key);
292 error.subgraph_name = Some(connector.id.subgraph_name.clone());
293 error.coordinate = Some(connector.id.coordinate());
294
295 error = error.extension(
297 "http",
298 Value::Object(Map::from_iter([(
299 "status".into(),
300 Value::Number(parts.status.as_u16().into()),
301 )])),
302 );
303
304 let mut extension_code = "CONNECTOR_FETCH".to_string();
309 if let Some(extensions_selection) = &connector.error_settings.source_extensions {
310 let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
311 warnings.extend(aggregate_apply_to_errors(
312 apply_to_errors,
313 ProblemLocation::SourceErrorsExtensions,
314 ));
315
316 let extensions = res
318 .and_then(|e| match e {
319 Value::Object(map) => Some(map),
320 _ => None,
321 })
322 .unwrap_or_default();
323
324 if let Some(code) = extensions.get("code") {
325 extension_code = code.as_str().unwrap_or_default().to_string();
326 }
327
328 for (key, value) in extensions {
329 error = error.extension(key, value);
330 }
331 }
332
333 if let Some(extensions_selection) = &connector.error_settings.connect_extensions {
334 let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
335 warnings.extend(aggregate_apply_to_errors(
336 apply_to_errors,
337 ProblemLocation::ConnectErrorsExtensions,
338 ));
339
340 let extensions = res
342 .and_then(|e| match e {
343 Value::Object(map) => Some(map),
344 _ => None,
345 })
346 .unwrap_or_default();
347
348 if let Some(code) = extensions.get("code") {
349 extension_code = code.as_str().unwrap_or_default().to_string();
350 }
351
352 for (key, value) in extensions {
353 error = error.extension(key, value);
354 }
355 }
356
357 error = error.with_code(extension_code);
358
359 MappedResponse::Error {
360 error,
361 key,
362 problems: warnings,
363 }
364}
365#[derive(Debug)]
367pub enum MappedResponse {
368 Error {
371 error: RuntimeError,
372 key: ResponseKey,
373 problems: Vec<Problem>,
374 },
375 Data {
377 data: Value,
378 key: ResponseKey,
379 problems: Vec<Problem>,
380 },
381}
382
383impl MappedResponse {
384 pub fn add_to_data(
388 self,
389 data: &mut Map<ByteString, Value>,
390 errors: &mut Vec<RuntimeError>,
391 count: usize,
392 ) -> Result<(), HandleResponseError> {
393 match self {
394 Self::Error { error, key, .. } => {
395 match key {
396 ResponseKey::Entity { index, .. } | ResponseKey::EntityField { index, .. } => {
398 let entities = data
399 .entry(ENTITIES)
400 .or_insert(Value::Array(Vec::with_capacity(count)));
401 entities
402 .as_array_mut()
403 .ok_or_else(|| {
404 HandleResponseError::MergeError("_entities is not an array".into())
405 })?
406 .insert(index, Value::Null);
407 }
408 _ => {}
409 };
410 errors.push(error);
411 }
412 Self::Data {
413 data: value, key, ..
414 } => match key {
415 ResponseKey::RootField { ref name, .. } => {
416 data.insert(name.clone(), value);
417 }
418 ResponseKey::Entity { index, .. } => {
419 let entities = data
420 .entry(ENTITIES)
421 .or_insert(Value::Array(Vec::with_capacity(count)));
422 entities
423 .as_array_mut()
424 .ok_or_else(|| {
425 HandleResponseError::MergeError("_entities is not an array".into())
426 })?
427 .insert(index, value);
428 }
429 ResponseKey::EntityField {
430 index,
431 ref field_name,
432 ref typename,
433 ..
434 } => {
435 let entities = data
436 .entry(ENTITIES)
437 .or_insert(Value::Array(Vec::with_capacity(count)))
438 .as_array_mut()
439 .ok_or_else(|| {
440 HandleResponseError::MergeError("_entities is not an array".into())
441 })?;
442
443 match entities.get_mut(index) {
444 Some(Value::Object(entity)) => {
445 entity.insert(field_name.clone(), value);
446 }
447 _ => {
448 let mut entity = Map::new();
449 if let Some(typename) = typename {
450 entity.insert(TYPENAME, Value::String(typename.as_str().into()));
451 }
452 entity.insert(field_name.clone(), value);
453 entities.insert(index, Value::Object(entity));
454 }
455 };
456 }
457 ResponseKey::BatchEntity {
458 selection,
459 keys,
460 inputs,
461 } => {
462 let Value::Array(values) = value else {
463 return Err(HandleResponseError::MergeError(
464 "Response for a batch request does not map to an array".into(),
465 ));
466 };
467
468 let spec = selection.spec();
469 let key_selection = JSONSelection::parse_with_spec(
470 &keys.serialize().no_indent().to_string(),
471 spec,
472 )
473 .map_err(|e| HandleResponseError::MergeError(e.to_string()))?;
474
475 let key_values = inputs.batch.iter().map(|v| {
477 key_selection
478 .apply_to(&Value::Object(v.clone()))
479 .0
480 .unwrap_or(Value::Null)
481 });
482
483 let mut map = values
485 .into_iter()
486 .filter_map(|v| key_selection.apply_to(&v).0.map(|key| (key, v)))
487 .collect::<HashMap<_, _>>();
488
489 let new_entities = key_values
491 .map(|key| map.remove(&key).unwrap_or(Value::Null))
492 .collect_vec();
493
494 let entities = data
496 .entry(ENTITIES)
497 .or_insert(Value::Array(Vec::with_capacity(count)));
498
499 entities
500 .as_array_mut()
501 .ok_or_else(|| {
502 HandleResponseError::MergeError("_entities is not an array".into())
503 })?
504 .extend(new_entities);
505 }
506 },
507 }
508
509 Ok(())
510 }
511
512 pub fn problems(&self) -> &[Problem] {
513 match self {
514 Self::Error { problems, .. } | Self::Data { problems, .. } => problems,
515 }
516 }
517
518 pub fn apply_operation(
531 self, operation_option: Option<&ExecutableDocument>,
533 subtypes: &IndexMap<String, IndexSet<String>>,
534 ) -> Self {
535 match (self, operation_option) {
536 (
537 Self::Data {
538 data,
539 key,
540 problems,
541 },
542 Some(operation),
543 ) => {
544 let single_op = operation
545 .operations
546 .anonymous
547 .as_ref()
548 .or_else(|| operation.operations.named.values().next());
549
550 let data = if let Some(op) = single_op {
551 let mut new_sub = SelectionSet::new(op.selection_set.ty.clone());
552
553 match &key {
554 ResponseKey::RootField { name, .. } => {
555 for field in op.selection_set.selections.iter() {
556 if let Selection::Field(field) = field
557 && field.name.as_str() == name.as_str()
558 {
559 new_sub
560 .selections
561 .extend(field.selection_set.selections.iter().cloned());
562 }
563 }
564 }
565
566 ResponseKey::EntityField { field_name, .. } => {
567 let field_str = field_name.as_str();
568
569 for selection in op.selection_set.selections.iter() {
570 if let Selection::Field(field) = selection
571 && field.name.as_str() == "_entities"
572 {
573 for ent_sel in field.selection_set.selections.iter() {
574 match ent_sel {
579 Selection::InlineFragment(frag) => {
580 for field_sel in
581 frag.selection_set.selections.iter()
582 {
583 if let Selection::Field(field) = field_sel
584 && field.name.as_str() == field_str
585 {
586 new_sub.selections.extend(
587 field
588 .selection_set
589 .selections
590 .iter()
591 .cloned(),
592 );
593 }
594 }
595 }
596
597 Selection::Field(field) => {
598 if field.name.as_str() == field_str {
599 new_sub.selections.extend(
600 field
601 .selection_set
602 .selections
603 .iter()
604 .cloned(),
605 );
606 }
607 }
608
609 Selection::FragmentSpread(spread) => {
610 if let Some(fragment) = operation
611 .fragments
612 .get(spread.fragment_name.as_str())
613 {
614 for field_sel in
615 fragment.selection_set.selections.iter()
616 {
617 if let Selection::Field(field) = field_sel
618 && field.name.as_str() == field_str
619 {
620 new_sub.selections.extend(
621 field
622 .selection_set
623 .selections
624 .iter()
625 .cloned(),
626 );
627 }
628 }
629 }
630 }
631 }
632 }
633 }
634 }
635 }
636
637 ResponseKey::Entity { .. } => {
638 for selection in op.selection_set.selections.iter() {
639 if let Selection::Field(field) = selection
640 && field.name.as_str() == "_entities"
641 {
642 new_sub
643 .selections
644 .extend(field.selection_set.selections.iter().cloned());
645 }
646 }
647 }
648
649 ResponseKey::BatchEntity { keys, .. } => {
650 new_sub
651 .selections
652 .extend(keys.selection_set.selections.iter().cloned());
653
654 for selection in op.selection_set.selections.iter() {
655 if let Selection::Field(field) = selection
656 && field.name.as_str() == "_entities"
657 {
658 new_sub
659 .selections
660 .extend(field.selection_set.selections.iter().cloned());
661 }
662 }
663 }
664 };
665
666 GraphQLDataMapper::new(operation, subtypes).map_data(&data, &new_sub)
667 } else {
668 data
669 };
670
671 Self::Data {
672 data,
673 key,
674 problems,
675 }
676 }
677
678 (
680 MappedResponse::Error {
681 error,
682 key,
683 problems,
684 },
685 Some(_),
686 ) => MappedResponse::Error {
687 error,
688 key,
689 problems,
690 },
691
692 (mapped, None) => mapped,
694 }
695 }
696}