1use apollo_compiler::ExecutableDocument;
2use apollo_compiler::Name;
3use apollo_compiler::collections::HashMap;
4use apollo_compiler::collections::IndexMap;
5use apollo_compiler::collections::IndexSet;
6use apollo_compiler::executable::Selection;
7use apollo_compiler::executable::SelectionSet;
8use encoding_rs::Encoding;
9use encoding_rs::UTF_8;
10use http::HeaderMap;
11use http::HeaderValue;
12use http::header::CONTENT_LENGTH;
13use http::header::CONTENT_TYPE;
14use http::response::Parts;
15use itertools::Itertools;
16use mime::Mime;
17use serde_json_bytes::ByteString;
18use serde_json_bytes::Map;
19use serde_json_bytes::Value;
20
21use crate::connectors::Connector;
22use crate::connectors::JSONSelection;
23use crate::connectors::ProblemLocation;
24use crate::connectors::runtime::errors::RuntimeError;
25use crate::connectors::runtime::inputs::ContextReader;
26use crate::connectors::runtime::key::ResponseKey;
27use crate::connectors::runtime::mapping::Problem;
28use crate::connectors::runtime::mapping::aggregate_apply_to_errors;
29use crate::connectors::runtime::responses::DeserializeError::ContentDecoding;
30
31const ENTITIES: &str = "_entities";
32const TYPENAME: &str = "__typename";
33
34#[derive(Debug, thiserror::Error)]
35pub enum HandleResponseError {
36 #[error("Merge error: {0}")]
37 MergeError(String),
38}
39
40pub fn deserialize_response(body: &[u8], headers: &HeaderMap) -> Result<Value, DeserializeError> {
42 if body.is_empty()
48 || headers
49 .get(CONTENT_LENGTH)
50 .and_then(|len| len.to_str().ok())
51 .and_then(|s| s.parse::<usize>().ok())
52 .is_some_and(|content_length| content_length == 0)
53 {
54 return Ok(Value::Null);
55 }
56
57 let content_type = headers
58 .get(CONTENT_TYPE)
59 .and_then(|h| h.to_str().ok()?.parse::<Mime>().ok());
60
61 if content_type.is_none()
62 || content_type
63 .as_ref()
64 .is_some_and(|ct| ct.subtype() == mime::JSON || ct.suffix() == Some(mime::JSON))
65 {
66 serde_json::from_slice::<Value>(body).map_err(DeserializeError::SerdeJson)
69 } else if content_type
70 .as_ref()
71 .is_some_and(|ct| ct.type_() == mime::TEXT && ct.subtype() == mime::PLAIN)
72 {
73 let encoding = content_type
76 .as_ref()
77 .and_then(|ct| Encoding::for_label(ct.get_param("charset")?.as_str().as_bytes()))
78 .unwrap_or(UTF_8);
79 let (decoded_body, _, had_errors) = encoding.decode(body);
80
81 if had_errors {
82 return Err(ContentDecoding(encoding.name()));
83 }
84
85 Ok(Value::String(decoded_body.into_owned().into()))
86 } else {
87 Ok(Value::Null)
89 }
90}
91
92#[derive(Debug, thiserror::Error)]
93pub enum DeserializeError {
94 #[error("Could not parse JSON: {0}")]
95 SerdeJson(#[source] serde_json::Error),
96 #[error("Could not decode data with content encoding {0}")]
97 ContentDecoding(&'static str),
98}
99
100pub fn handle_raw_response(
101 data: &Value,
102 parts: &Parts,
103 key: ResponseKey,
104 connector: &Connector,
105 context: impl ContextReader,
106 client_headers: &HeaderMap<HeaderValue>,
107) -> MappedResponse {
108 let inputs = key
109 .inputs()
110 .clone()
111 .merger(&connector.response_variable_keys)
112 .config(connector.config.as_ref())
113 .context(context)
114 .status(parts.status.as_u16())
115 .request(&connector.response_headers, client_headers)
116 .response(&connector.response_headers, Some(parts))
117 .merge();
118 let warnings = Vec::new();
119 let (success, warnings) = is_success(connector, data, parts, &inputs, warnings);
120 if success {
121 map_response(data, key, inputs, warnings)
122 } else {
123 map_error(connector, data, parts, key, inputs, warnings)
124 }
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
128struct GraphQLDataMapper<'a> {
129 doc: &'a ExecutableDocument,
130 subtypes_map: &'a IndexMap<String, IndexSet<String>>,
131}
132
133impl<'a> GraphQLDataMapper<'a> {
134 fn new(
135 doc: &'a ExecutableDocument,
136 subtypes_map: &'a IndexMap<String, IndexSet<String>>,
137 ) -> Self {
138 Self { doc, subtypes_map }
139 }
140
141 fn fragment_matches(&self, data: &Value, fragment_type_condition: &Name) -> bool {
142 if let Some(data_typename) = data.get("__typename") {
143 match data_typename {
144 Value::String(typename) => {
145 self.supertype_has_subtype(fragment_type_condition.as_str(), typename.as_str())
146 }
147 _ => false,
148 }
149 } else {
150 true
151 }
152 }
153
154 fn supertype_has_subtype(&self, supertype: &str, subtype: &str) -> bool {
155 if supertype == subtype {
156 true
157 } else if let Some(subtypes) = self.subtypes_map.get(supertype) {
158 subtypes
159 .iter()
160 .any(|s| self.supertype_has_subtype(s, subtype))
161 } else {
162 false
163 }
164 }
165
166 fn map_data(&self, data: &Value, selection_set: &SelectionSet) -> Value {
167 if selection_set.selections.is_empty() {
168 return data.clone();
169 }
170
171 match data {
172 Value::Object(map) => {
173 let mut new_map = Map::new();
174
175 for field in selection_set.selections.iter() {
176 match field {
177 Selection::Field(field) => {
178 if let Some(field_value) = map.get(field.name.as_str()) {
179 let output_field_name = field.alias.as_ref().unwrap_or(&field.name);
180 new_map.insert(
181 output_field_name.to_string(),
182 self.map_data(field_value, &field.selection_set),
183 );
184 }
185 }
186
187 Selection::FragmentSpread(spread) => {
188 if let Some(fragment) =
189 self.doc.fragments.get(spread.fragment_name.as_str())
190 && self.fragment_matches(data, fragment.type_condition())
191 {
192 let mapped = self.map_data(data, &fragment.selection_set);
193 if let Some(fragment_map) = mapped.as_object() {
194 new_map.extend(fragment_map.clone());
195 }
196 }
197 }
198
199 Selection::InlineFragment(fragment) => {
200 if let Some(type_condition) = &fragment.type_condition
201 && !self.fragment_matches(data, type_condition)
202 {
203 continue;
204 }
205 let mapped = self.map_data(data, &fragment.selection_set);
206 if let Some(fragment_map) = mapped.as_object() {
207 new_map.extend(fragment_map.clone());
208 }
209 }
210 }
211 }
212
213 Value::Object(new_map)
214 }
215
216 Value::Array(items) => Value::Array(
217 items
218 .iter()
219 .map(|item| self.map_data(item, selection_set))
220 .collect(),
221 ),
222
223 primitive => primitive.clone(),
224 }
225 }
226}
227
228fn is_success(
231 connector: &Connector,
232 data: &Value,
233 parts: &Parts,
234 inputs: &IndexMap<String, Value>,
235 mut warnings: Vec<Problem>,
236) -> (bool, Vec<Problem>) {
237 let Some(is_success_selection) = &connector.error_settings.connect_is_success else {
238 return (parts.status.is_success(), warnings);
239 };
240 let (res, apply_to_errors) = is_success_selection.apply_with_vars(data, inputs);
241 warnings.extend(aggregate_apply_to_errors(
242 apply_to_errors,
243 ProblemLocation::IsSuccess,
244 ));
245
246 (
247 res.as_ref().and_then(Value::as_bool).unwrap_or_default(),
248 warnings,
249 )
250}
251
252pub(super) fn map_response(
254 data: &Value,
255 key: ResponseKey,
256 inputs: IndexMap<String, Value>,
257 mut warnings: Vec<Problem>,
258) -> MappedResponse {
259 let (res, apply_to_errors) = key.selection().apply_with_vars(data, &inputs);
260 warnings.extend(aggregate_apply_to_errors(
261 apply_to_errors,
262 ProblemLocation::Selection,
263 ));
264 MappedResponse::Data {
265 key,
266 data: res.unwrap_or_else(|| Value::Null),
267 problems: warnings,
268 }
269}
270
271pub(super) fn map_error(
273 connector: &Connector,
274 data: &Value,
275 parts: &Parts,
276 key: ResponseKey,
277 inputs: IndexMap<String, Value>,
278 mut warnings: Vec<Problem>,
279) -> MappedResponse {
280 let message = if let Some(message_selection) = &connector.error_settings.message {
282 let (res, apply_to_errors) = message_selection.apply_with_vars(data, &inputs);
283 warnings.extend(aggregate_apply_to_errors(
284 apply_to_errors,
285 ProblemLocation::ErrorsMessage,
286 ));
287 res.as_ref()
288 .and_then(Value::as_str)
289 .unwrap_or_default()
290 .to_string()
291 } else {
292 "Request failed".to_string()
293 };
294
295 let mut error = RuntimeError::new(message, &key);
297 error.subgraph_name = Some(connector.id.subgraph_name.clone());
298 error.coordinate = Some(connector.id.coordinate());
299
300 error = error.extension(
302 "http",
303 Value::Object(Map::from_iter([(
304 "status".into(),
305 Value::Number(parts.status.as_u16().into()),
306 )])),
307 );
308
309 let mut extension_code = "CONNECTOR_FETCH".to_string();
314 if let Some(extensions_selection) = &connector.error_settings.source_extensions {
315 let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
316 warnings.extend(aggregate_apply_to_errors(
317 apply_to_errors,
318 ProblemLocation::SourceErrorsExtensions,
319 ));
320
321 let extensions = res
323 .and_then(|e| match e {
324 Value::Object(map) => Some(map),
325 _ => None,
326 })
327 .unwrap_or_default();
328
329 if let Some(code) = extensions.get("code") {
330 extension_code = code.as_str().unwrap_or_default().to_string();
331 }
332
333 for (key, value) in extensions {
334 error = error.extension(key, value);
335 }
336 }
337
338 if let Some(extensions_selection) = &connector.error_settings.connect_extensions {
339 let (res, apply_to_errors) = extensions_selection.apply_with_vars(data, &inputs);
340 warnings.extend(aggregate_apply_to_errors(
341 apply_to_errors,
342 ProblemLocation::ConnectErrorsExtensions,
343 ));
344
345 let extensions = res
347 .and_then(|e| match e {
348 Value::Object(map) => Some(map),
349 _ => None,
350 })
351 .unwrap_or_default();
352
353 if let Some(code) = extensions.get("code") {
354 extension_code = code.as_str().unwrap_or_default().to_string();
355 }
356
357 for (key, value) in extensions {
358 error = error.extension(key, value);
359 }
360 }
361
362 error = error.with_code(extension_code);
363
364 MappedResponse::Error {
365 error,
366 key,
367 problems: warnings,
368 }
369}
370#[derive(Debug)]
372pub enum MappedResponse {
373 Error {
376 error: RuntimeError,
377 key: ResponseKey,
378 problems: Vec<Problem>,
379 },
380 Data {
382 data: Value,
383 key: ResponseKey,
384 problems: Vec<Problem>,
385 },
386}
387
388impl MappedResponse {
389 pub fn add_to_data(
393 self,
394 data: &mut Map<ByteString, Value>,
395 errors: &mut Vec<RuntimeError>,
396 count: usize,
397 ) -> Result<(), HandleResponseError> {
398 match self {
399 Self::Error { error, key, .. } => {
400 match key {
401 ResponseKey::Entity { index, .. } | ResponseKey::EntityField { index, .. } => {
403 let entities = data
404 .entry(ENTITIES)
405 .or_insert(Value::Array(Vec::with_capacity(count)));
406 entities
407 .as_array_mut()
408 .ok_or_else(|| {
409 HandleResponseError::MergeError("_entities is not an array".into())
410 })?
411 .insert(index, Value::Null);
412 }
413 _ => {}
414 };
415 errors.push(error);
416 }
417 Self::Data {
418 data: value, key, ..
419 } => match key {
420 ResponseKey::RootField { ref name, .. } => {
421 data.insert(name.clone(), value);
422 }
423 ResponseKey::Entity { index, .. } => {
424 let entities = data
425 .entry(ENTITIES)
426 .or_insert(Value::Array(Vec::with_capacity(count)));
427 entities
428 .as_array_mut()
429 .ok_or_else(|| {
430 HandleResponseError::MergeError("_entities is not an array".into())
431 })?
432 .insert(index, value);
433 }
434 ResponseKey::EntityField {
435 index,
436 ref field_name,
437 ref typename,
438 ..
439 } => {
440 let entities = data
441 .entry(ENTITIES)
442 .or_insert(Value::Array(Vec::with_capacity(count)))
443 .as_array_mut()
444 .ok_or_else(|| {
445 HandleResponseError::MergeError("_entities is not an array".into())
446 })?;
447
448 match entities.get_mut(index) {
449 Some(Value::Object(entity)) => {
450 entity.insert(field_name.clone(), value);
451 }
452 _ => {
453 let mut entity = Map::new();
454 if let Some(typename) = typename {
455 entity.insert(TYPENAME, Value::String(typename.as_str().into()));
456 }
457 entity.insert(field_name.clone(), value);
458 entities.insert(index, Value::Object(entity));
459 }
460 };
461 }
462 ResponseKey::BatchEntity {
463 selection,
464 keys,
465 inputs,
466 } => {
467 let Value::Array(values) = value else {
468 return Err(HandleResponseError::MergeError(
469 "Response for a batch request does not map to an array".into(),
470 ));
471 };
472
473 let spec = selection.spec();
474 let key_selection = JSONSelection::parse_with_spec(
475 &keys.serialize().no_indent().to_string(),
476 spec,
477 )
478 .map_err(|e| HandleResponseError::MergeError(e.to_string()))?;
479
480 let key_values = inputs.batch.iter().map(|v| {
482 key_selection
483 .apply_to(&Value::Object(v.clone()))
484 .0
485 .unwrap_or(Value::Null)
486 });
487
488 let mut map = values
490 .into_iter()
491 .filter_map(|v| key_selection.apply_to(&v).0.map(|key| (key, v)))
492 .collect::<HashMap<_, _>>();
493
494 let new_entities = key_values
496 .map(|key| map.remove(&key).unwrap_or(Value::Null))
497 .collect_vec();
498
499 let entities = data
501 .entry(ENTITIES)
502 .or_insert(Value::Array(Vec::with_capacity(count)));
503
504 entities
505 .as_array_mut()
506 .ok_or_else(|| {
507 HandleResponseError::MergeError("_entities is not an array".into())
508 })?
509 .extend(new_entities);
510 }
511 },
512 }
513
514 Ok(())
515 }
516
517 pub fn problems(&self) -> &[Problem] {
518 match self {
519 Self::Error { problems, .. } | Self::Data { problems, .. } => problems,
520 }
521 }
522
523 pub fn apply_operation(
536 self, operation_option: Option<&ExecutableDocument>,
538 subtypes: &IndexMap<String, IndexSet<String>>,
539 ) -> Self {
540 match (self, operation_option) {
541 (
542 Self::Data {
543 data,
544 key,
545 problems,
546 },
547 Some(operation),
548 ) => {
549 let single_op = operation
550 .operations
551 .anonymous
552 .as_ref()
553 .or_else(|| operation.operations.named.values().next());
554
555 let data = if let Some(op) = single_op {
556 let mut new_sub = SelectionSet::new(op.selection_set.ty.clone());
557
558 match &key {
559 ResponseKey::RootField { name, .. } => {
560 for field in op.selection_set.selections.iter() {
561 if let Selection::Field(field) = field
562 && field.name.as_str() == name.as_str()
563 {
564 new_sub
565 .selections
566 .extend(field.selection_set.selections.iter().cloned());
567 }
568 }
569 }
570
571 ResponseKey::EntityField { field_name, .. } => {
572 let field_str = field_name.as_str();
573
574 for selection in op.selection_set.selections.iter() {
575 if let Selection::Field(field) = selection
576 && field.name.as_str() == "_entities"
577 {
578 for ent_sel in field.selection_set.selections.iter() {
579 match ent_sel {
584 Selection::InlineFragment(frag) => {
585 for field_sel in
586 frag.selection_set.selections.iter()
587 {
588 if let Selection::Field(field) = field_sel
589 && field.name.as_str() == field_str
590 {
591 new_sub.selections.extend(
592 field
593 .selection_set
594 .selections
595 .iter()
596 .cloned(),
597 );
598 }
599 }
600 }
601
602 Selection::Field(field) => {
603 if field.name.as_str() == field_str {
604 new_sub.selections.extend(
605 field
606 .selection_set
607 .selections
608 .iter()
609 .cloned(),
610 );
611 }
612 }
613
614 Selection::FragmentSpread(spread) => {
615 if let Some(fragment) = operation
616 .fragments
617 .get(spread.fragment_name.as_str())
618 {
619 for field_sel in
620 fragment.selection_set.selections.iter()
621 {
622 if let Selection::Field(field) = field_sel
623 && field.name.as_str() == field_str
624 {
625 new_sub.selections.extend(
626 field
627 .selection_set
628 .selections
629 .iter()
630 .cloned(),
631 );
632 }
633 }
634 }
635 }
636 }
637 }
638 }
639 }
640 }
641
642 ResponseKey::Entity { .. } => {
643 for selection in op.selection_set.selections.iter() {
644 if let Selection::Field(field) = selection
645 && field.name.as_str() == "_entities"
646 {
647 new_sub
648 .selections
649 .extend(field.selection_set.selections.iter().cloned());
650 }
651 }
652 }
653
654 ResponseKey::BatchEntity { keys, .. } => {
655 new_sub
656 .selections
657 .extend(keys.selection_set.selections.iter().cloned());
658
659 for selection in op.selection_set.selections.iter() {
660 if let Selection::Field(field) = selection
661 && field.name.as_str() == "_entities"
662 {
663 new_sub
664 .selections
665 .extend(field.selection_set.selections.iter().cloned());
666 }
667 }
668 }
669 };
670
671 GraphQLDataMapper::new(operation, subtypes).map_data(&data, &new_sub)
672 } else {
673 data
674 };
675
676 Self::Data {
677 data,
678 key,
679 problems,
680 }
681 }
682
683 (
685 MappedResponse::Error {
686 error,
687 key,
688 problems,
689 },
690 Some(_),
691 ) => MappedResponse::Error {
692 error,
693 key,
694 problems,
695 },
696
697 (mapped, None) => mapped,
699 }
700 }
701}
702
703#[cfg(test)]
704mod tests {
705 use http::HeaderMap;
706 use http::HeaderValue;
707 use serde_json_bytes::Value;
708
709 use super::deserialize_response;
710
711 fn headers_with(pairs: &[(&str, &str)]) -> HeaderMap {
712 let mut map = HeaderMap::new();
713 for (k, v) in pairs {
714 map.insert(
715 http::header::HeaderName::from_bytes(k.as_bytes()).unwrap(),
716 HeaderValue::from_str(v).unwrap(),
717 );
718 }
719 map
720 }
721
722 #[test]
723 fn empty_body_no_content_length_returns_null() {
724 let headers = HeaderMap::new();
726 let result = deserialize_response(b"", &headers).unwrap();
727 assert_eq!(result, Value::Null);
728 }
729
730 #[test]
731 fn empty_body_with_content_length_zero_returns_null() {
732 let headers = headers_with(&[("content-length", "0")]);
734 let result = deserialize_response(b"", &headers).unwrap();
735 assert_eq!(result, Value::Null);
736 }
737}