1mod headers;
2mod http_json_transport;
3mod keys;
4mod problem_location;
5mod source;
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use apollo_compiler::Name;
11use apollo_compiler::Schema;
12use apollo_compiler::collections::HashSet;
13use apollo_compiler::collections::IndexMap;
14use apollo_compiler::collections::IndexSet;
15use apollo_compiler::executable::FieldSet;
16use apollo_compiler::schema::ExtendedType;
17use apollo_compiler::validation::Valid;
18use keys::make_key_field_set_from_variables;
19use serde_json::Value;
20
21pub use self::headers::Header;
22pub(crate) use self::headers::HeaderParseError;
23pub use self::headers::HeaderSource;
24pub use self::headers::OriginatingDirective;
25pub use self::http_json_transport::HTTPMethod;
26pub use self::http_json_transport::HttpJsonTransport;
27pub use self::http_json_transport::MakeUriError;
28pub use self::problem_location::ProblemLocation;
29pub use self::source::SourceName;
30use super::ConnectId;
31use super::JSONSelection;
32use super::PathSelection;
33use super::id::ConnectorPosition;
34use super::json_selection::VarPaths;
35use super::spec::connect::ConnectBatchArguments;
36use super::spec::connect::ConnectDirectiveArguments;
37use super::spec::errors::ErrorsArguments;
38use super::spec::source::SourceDirectiveArguments;
39use super::variable::Namespace;
40use super::variable::VariableReference;
41use crate::connectors::ConnectSpec;
42use crate::connectors::spec::ConnectLink;
43use crate::connectors::spec::extract_connect_directive_arguments;
44use crate::connectors::spec::extract_source_directive_arguments;
45use crate::error::FederationError;
46use crate::error::SingleFederationError;
47use crate::internal_error;
48
49#[derive(Debug, Clone)]
52pub struct Connector {
53 pub id: ConnectId,
54 pub transport: HttpJsonTransport,
55 pub selection: JSONSelection,
56 pub config: Option<CustomConfiguration>,
57 pub max_requests: Option<usize>,
58
59 pub entity_resolver: Option<EntityResolver>,
61 pub spec: ConnectSpec,
63
64 pub schema_subtypes_map: IndexMap<String, IndexSet<String>>,
66
67 pub request_headers: HashSet<String>,
69 pub response_headers: HashSet<String>,
71 pub request_variable_keys: IndexMap<Namespace, IndexSet<String>>,
73 pub response_variable_keys: IndexMap<Namespace, IndexSet<String>>,
74
75 pub batch_settings: Option<ConnectBatchArguments>,
76
77 pub error_settings: ConnectorErrorsSettings,
78
79 pub label: Label,
81}
82
83#[derive(Debug, Clone, Default)]
84pub struct ConnectorErrorsSettings {
85 pub message: Option<JSONSelection>,
86 pub source_extensions: Option<JSONSelection>,
87 pub connect_extensions: Option<JSONSelection>,
88 pub connect_is_success: Option<JSONSelection>,
89}
90
91impl ConnectorErrorsSettings {
92 fn from_directive(
93 connect_errors: Option<&ErrorsArguments>,
94 source_errors: Option<&ErrorsArguments>,
95 connect_is_success: Option<&JSONSelection>,
96 ) -> Self {
97 let message = connect_errors
98 .and_then(|e| e.message.as_ref())
99 .or_else(|| source_errors.and_then(|e| e.message.as_ref()))
100 .cloned();
101 let source_extensions = source_errors.and_then(|e| e.extensions.as_ref()).cloned();
102 let connect_extensions = connect_errors.and_then(|e| e.extensions.as_ref()).cloned();
103 let connect_is_success = connect_is_success.cloned();
104 Self {
105 message,
106 source_extensions,
107 connect_extensions,
108 connect_is_success,
109 }
110 }
111
112 pub fn variable_references(&self) -> impl Iterator<Item = VariableReference<Namespace>> + '_ {
113 self.message
114 .as_ref()
115 .into_iter()
116 .flat_map(|m| m.variable_references())
117 .chain(
118 self.source_extensions
119 .as_ref()
120 .into_iter()
121 .flat_map(|m| m.variable_references()),
122 )
123 .chain(
124 self.connect_extensions
125 .as_ref()
126 .into_iter()
127 .flat_map(|m| m.variable_references()),
128 )
129 .chain(
130 self.connect_is_success
131 .as_ref()
132 .into_iter()
133 .flat_map(|m| m.variable_references()),
134 )
135 }
136}
137
138pub type CustomConfiguration = Arc<HashMap<String, Value>>;
139
140#[derive(Debug, Clone, PartialEq, Eq)]
146pub enum EntityResolver {
147 Explicit,
149
150 Implicit,
152
153 TypeBatch,
155
156 TypeSingle,
158}
159
160impl Connector {
161 pub fn from_schema(schema: &Schema, subgraph_name: &str) -> Result<Vec<Self>, FederationError> {
168 let Some(link) = ConnectLink::new(schema) else {
169 return Ok(Default::default());
170 };
171 let link = link.map_err(|message| SingleFederationError::UnknownLinkVersion {
172 message: message.message,
173 })?;
174
175 let source_arguments =
176 extract_source_directive_arguments(schema, &link.source_directive_name)?;
177
178 let connect_arguments =
179 extract_connect_directive_arguments(schema, &link.connect_directive_name)?;
180
181 connect_arguments
182 .into_iter()
183 .map(|args| {
184 Self::from_directives(schema, subgraph_name, link.spec, args, &source_arguments)
185 })
186 .collect::<Result<Vec<_>, _>>()
187 }
188
189 fn from_directives(
190 schema: &Schema,
191 subgraph_name: &str,
192 spec: ConnectSpec,
193 connect: ConnectDirectiveArguments,
194 source_arguments: &[SourceDirectiveArguments],
195 ) -> Result<Self, FederationError> {
196 let source = connect
197 .source
198 .and_then(|name| source_arguments.iter().find(|s| s.name == name));
199 let source_name = source.map(|s| s.name.clone());
200
201 let connect_http = connect
203 .http
204 .ok_or_else(|| internal_error!("@connect(http:) missing"))?;
205 let source_http = source.map(|s| &s.http);
206 let transport = HttpJsonTransport::from_directive(connect_http, source_http, spec)?;
207
208 let batch_settings = connect.batch;
210 let connect_errors = connect.errors.as_ref();
211 let source_errors = source.and_then(|s| s.errors.as_ref());
212 let is_success = connect
214 .is_success
215 .as_ref()
216 .or_else(|| source.and_then(|s| s.is_success.as_ref()));
217
218 let error_settings =
219 ConnectorErrorsSettings::from_directive(connect_errors, source_errors, is_success);
220
221 let request_references: IndexSet<VariableReference<Namespace>> =
223 transport.variable_references().collect();
224
225 let response_references: IndexSet<VariableReference<Namespace>> = connect
227 .selection
228 .variable_references()
229 .chain(error_settings.variable_references())
230 .collect();
231
232 let request_variable_keys = extract_variable_key_references(request_references.iter());
235 let response_variable_keys = extract_variable_key_references(response_references.iter());
236
237 let request_headers = extract_header_references(&request_references); let response_headers = extract_header_references(&response_references); let entity_resolver = determine_entity_resolver(
243 &connect.position,
244 connect.entity,
245 schema,
246 &request_variable_keys,
247 );
248 let label = Label::new(
249 subgraph_name,
250 source_name.as_ref(),
251 &transport,
252 entity_resolver.as_ref(),
253 );
254 let id = ConnectId {
255 subgraph_name: subgraph_name.to_string(),
256 source_name,
257 named: connect.connector_id,
258 directive: connect.position,
259 };
260
261 Ok(Connector {
262 id,
263 transport,
264 selection: connect.selection,
265 entity_resolver,
266 config: None,
267 max_requests: None,
268 spec,
269 schema_subtypes_map: Connector::subtypes_map_from_schema(schema),
270 request_headers,
271 response_headers,
272 request_variable_keys,
273 response_variable_keys,
274 batch_settings,
275 error_settings,
276 label,
277 })
278 }
279
280 pub fn subtypes_map_from_schema(schema: &Schema) -> IndexMap<String, IndexSet<String>> {
281 let mut subtypes_map: IndexMap<String, IndexSet<String>> = IndexMap::default();
282
283 for (name, ty) in schema.types.iter() {
285 match ty {
286 ExtendedType::Object(o) => {
287 for supertype in &o.implements_interfaces {
288 subtypes_map
289 .entry(supertype.to_string())
290 .or_default()
291 .insert(name.to_string());
292 }
293 }
294 ExtendedType::Interface(i) => {
295 for supertype in &i.implements_interfaces {
296 subtypes_map
297 .entry(supertype.to_string())
298 .or_default()
299 .insert(name.to_string());
300 }
301 }
302 ExtendedType::Union(u) => {
303 for member in &u.members {
304 subtypes_map
305 .entry(u.name.to_string())
306 .or_default()
307 .insert(member.to_string());
308 }
309 }
310 _ => {
311 }
313 }
314 }
315
316 subtypes_map
317 }
318
319 pub(crate) fn variable_references(&self) -> impl Iterator<Item = VariableReference<Namespace>> {
320 self.transport.variable_references().chain(
321 self.selection
322 .external_var_paths()
323 .into_iter()
324 .flat_map(PathSelection::variable_reference),
325 )
326 }
327
328 pub fn resolvable_key(&self, schema: &Schema) -> Result<Option<Valid<FieldSet>>, String> {
330 match &self.entity_resolver {
331 None => Ok(None),
332 Some(EntityResolver::Explicit) => {
333 make_key_field_set_from_variables(
334 schema,
335 &self.id.directive.base_type_name(schema).ok_or_else(|| {
336 format!("Missing field {}", self.id.directive.coordinate())
337 })?,
338 self.variable_references(),
339 Namespace::Args,
340 )
341 }
342 Some(EntityResolver::Implicit) => {
343 make_key_field_set_from_variables(
344 schema,
345 &self.id.directive.parent_type_name().ok_or_else(|| {
346 format!("Missing type {}", self.id.directive.coordinate())
347 })?,
348 self.variable_references(),
349 Namespace::This,
350 )
351 }
352 Some(EntityResolver::TypeBatch) => {
353 make_key_field_set_from_variables(
354 schema,
355 &self.id.directive.base_type_name(schema).ok_or_else(|| {
356 format!("Missing type {}", self.id.directive.coordinate())
357 })?,
358 self.variable_references(),
359 Namespace::Batch,
360 )
361 }
362 Some(EntityResolver::TypeSingle) => {
363 make_key_field_set_from_variables(
364 schema,
365 &self.id.directive.base_type_name(schema).ok_or_else(|| {
366 format!("Missing type {}", self.id.directive.coordinate())
367 })?,
368 self.variable_references(),
369 Namespace::This,
370 )
371 }
372 }
373 .map_err(|_| {
374 format!(
375 "Failed to create key for connector {}",
376 self.id.coordinate()
377 )
378 })
379 }
380
381 pub fn source_config_key(&self) -> String {
385 if let Some(source_name) = &self.id.source_name {
386 format!("{}.{}", self.id.subgraph_name, source_name)
387 } else {
388 format!("{}.{}", self.id.subgraph_name, self.id.synthetic_name())
389 }
390 }
391
392 pub fn name(&self) -> Name {
396 match &self.id.directive {
397 ConnectorPosition::Field(field_position) => field_position.directive_name.clone(),
398 ConnectorPosition::Type(type_position) => type_position.directive_name.clone(),
399 }
400 }
401
402 pub fn id(&self) -> String {
404 self.id.name()
405 }
406
407 pub fn abstract_types(&self) -> IndexSet<String> {
409 self.schema_subtypes_map.keys().cloned().collect()
410 }
411}
412
413#[derive(Debug, Clone)]
415pub struct Label(pub String);
416
417impl Label {
418 fn new(
419 subgraph_name: &str,
420 source: Option<&SourceName>,
421 transport: &HttpJsonTransport,
422 entity_resolver: Option<&EntityResolver>,
423 ) -> Self {
424 let source = source.map(SourceName::as_str).unwrap_or_default();
425 let batch = match entity_resolver {
426 Some(EntityResolver::TypeBatch) => "[BATCH] ",
427 _ => "",
428 };
429 Self(format!(
430 "{batch}{subgraph_name}.{source} {}",
431 transport.label()
432 ))
433 }
434}
435
436impl From<&str> for Label {
437 fn from(label: &str) -> Self {
438 Self(label.to_string())
439 }
440}
441
442impl AsRef<str> for Label {
443 fn as_ref(&self) -> &str {
444 &self.0
445 }
446}
447
448fn determine_entity_resolver(
449 position: &ConnectorPosition,
450 entity: bool,
451 schema: &Schema,
452 request_variables: &IndexMap<Namespace, IndexSet<String>>,
453) -> Option<EntityResolver> {
454 match position {
455 ConnectorPosition::Field(_) => {
456 match (entity, position.on_root_type(schema)) {
457 (true, _) => Some(EntityResolver::Explicit), (_, false) => Some(EntityResolver::Implicit), _ => None,
460 }
461 }
462 ConnectorPosition::Type(_) => {
463 if request_variables.contains_key(&Namespace::Batch) {
464 Some(EntityResolver::TypeBatch) } else {
466 Some(EntityResolver::TypeSingle) }
468 }
469 }
470}
471
472fn extract_header_references(
474 variable_references: &IndexSet<VariableReference<Namespace>>,
475) -> HashSet<String> {
476 variable_references
477 .iter()
478 .flat_map(|var_ref| {
479 if var_ref.namespace.namespace != Namespace::Request
480 && var_ref.namespace.namespace != Namespace::Response
481 {
482 Vec::new()
483 } else {
484 var_ref
485 .selection
486 .get("headers")
487 .map(|headers_subtrie| headers_subtrie.keys().cloned().collect())
488 .unwrap_or_default()
489 }
490 })
491 .collect()
492}
493
494fn extract_variable_key_references<'a>(
497 references: impl Iterator<Item = &'a VariableReference<Namespace>>,
498) -> IndexMap<Namespace, IndexSet<String>> {
499 let mut variable_keys: IndexMap<Namespace, IndexSet<String>> = IndexMap::default();
500
501 for var_ref in references {
502 let set = variable_keys
504 .entry(var_ref.namespace.namespace)
505 .or_default();
506
507 for key in var_ref.selection.keys() {
508 set.insert(key.to_string());
509 }
510 }
511
512 variable_keys
513}
514
515#[cfg(test)]
516mod tests {
517 use apollo_compiler::Schema;
518 use insta::assert_debug_snapshot;
519
520 use super::*;
521 use crate::ValidFederationSubgraphs;
522 use crate::schema::FederationSchema;
523 use crate::supergraph::extract_subgraphs_from_supergraph;
524
525 static SIMPLE_SUPERGRAPH: &str = include_str!("./tests/schemas/simple.graphql");
526 static SIMPLE_SUPERGRAPH_V0_2: &str = include_str!("./tests/schemas/simple_v0_2.graphql");
527
528 fn get_subgraphs(supergraph_sdl: &str) -> ValidFederationSubgraphs {
529 let schema = Schema::parse(supergraph_sdl, "supergraph.graphql").unwrap();
530 let supergraph_schema = FederationSchema::new(schema).unwrap();
531 extract_subgraphs_from_supergraph(&supergraph_schema, Some(true)).unwrap()
532 }
533
534 #[test]
535 fn test_from_schema() {
536 let subgraphs = get_subgraphs(SIMPLE_SUPERGRAPH);
537 let subgraph = subgraphs.get("connectors").unwrap();
538 let connectors = Connector::from_schema(subgraph.schema.schema(), "connectors").unwrap();
539 assert_debug_snapshot!(&connectors, @r###"
540 [
541 Connector {
542 id: ConnectId {
543 subgraph_name: "connectors",
544 source_name: Some(
545 "json",
546 ),
547 named: None,
548 directive: Field(
549 ObjectOrInterfaceFieldDirectivePosition {
550 field: Object(Query.users),
551 directive_name: "connect",
552 directive_index: 0,
553 },
554 ),
555 },
556 transport: HttpJsonTransport {
557 source_template: Some(
558 StringTemplate {
559 parts: [
560 Constant(
561 Constant {
562 value: "https://jsonplaceholder.typicode.com/",
563 location: 0..37,
564 },
565 ),
566 ],
567 },
568 ),
569 connect_template: StringTemplate {
570 parts: [
571 Constant(
572 Constant {
573 value: "/users",
574 location: 0..6,
575 },
576 ),
577 ],
578 },
579 method: Get,
580 headers: [
581 Header {
582 name: "authtoken",
583 source: From(
584 "x-auth-token",
585 ),
586 },
587 Header {
588 name: "user-agent",
589 source: Value(
590 HeaderValue(
591 StringTemplate {
592 parts: [
593 Constant(
594 Constant {
595 value: "Firefox",
596 location: 0..7,
597 },
598 ),
599 ],
600 },
601 ),
602 ),
603 },
604 ],
605 body: None,
606 source_path: None,
607 source_query_params: None,
608 connect_path: None,
609 connect_query_params: None,
610 },
611 selection: JSONSelection {
612 inner: Named(
613 SubSelection {
614 selections: [
615 NamedSelection {
616 prefix: None,
617 path: PathSelection {
618 path: WithRange {
619 node: Key(
620 WithRange {
621 node: Field(
622 "id",
623 ),
624 range: Some(
625 0..2,
626 ),
627 },
628 WithRange {
629 node: Empty,
630 range: Some(
631 2..2,
632 ),
633 },
634 ),
635 range: Some(
636 0..2,
637 ),
638 },
639 },
640 },
641 NamedSelection {
642 prefix: None,
643 path: PathSelection {
644 path: WithRange {
645 node: Key(
646 WithRange {
647 node: Field(
648 "name",
649 ),
650 range: Some(
651 3..7,
652 ),
653 },
654 WithRange {
655 node: Empty,
656 range: Some(
657 7..7,
658 ),
659 },
660 ),
661 range: Some(
662 3..7,
663 ),
664 },
665 },
666 },
667 ],
668 range: Some(
669 0..7,
670 ),
671 },
672 ),
673 spec: V0_1,
674 },
675 config: None,
676 max_requests: None,
677 entity_resolver: None,
678 spec: V0_1,
679 schema_subtypes_map: {
680 "_Entity": {
681 "User",
682 },
683 },
684 request_headers: {},
685 response_headers: {},
686 request_variable_keys: {},
687 response_variable_keys: {},
688 batch_settings: None,
689 error_settings: ConnectorErrorsSettings {
690 message: None,
691 source_extensions: None,
692 connect_extensions: None,
693 connect_is_success: None,
694 },
695 label: Label(
696 "connectors.json http: GET /users",
697 ),
698 },
699 Connector {
700 id: ConnectId {
701 subgraph_name: "connectors",
702 source_name: Some(
703 "json",
704 ),
705 named: None,
706 directive: Field(
707 ObjectOrInterfaceFieldDirectivePosition {
708 field: Object(Query.posts),
709 directive_name: "connect",
710 directive_index: 0,
711 },
712 ),
713 },
714 transport: HttpJsonTransport {
715 source_template: Some(
716 StringTemplate {
717 parts: [
718 Constant(
719 Constant {
720 value: "https://jsonplaceholder.typicode.com/",
721 location: 0..37,
722 },
723 ),
724 ],
725 },
726 ),
727 connect_template: StringTemplate {
728 parts: [
729 Constant(
730 Constant {
731 value: "/posts",
732 location: 0..6,
733 },
734 ),
735 ],
736 },
737 method: Get,
738 headers: [
739 Header {
740 name: "authtoken",
741 source: From(
742 "x-auth-token",
743 ),
744 },
745 Header {
746 name: "user-agent",
747 source: Value(
748 HeaderValue(
749 StringTemplate {
750 parts: [
751 Constant(
752 Constant {
753 value: "Firefox",
754 location: 0..7,
755 },
756 ),
757 ],
758 },
759 ),
760 ),
761 },
762 ],
763 body: None,
764 source_path: None,
765 source_query_params: None,
766 connect_path: None,
767 connect_query_params: None,
768 },
769 selection: JSONSelection {
770 inner: Named(
771 SubSelection {
772 selections: [
773 NamedSelection {
774 prefix: None,
775 path: PathSelection {
776 path: WithRange {
777 node: Key(
778 WithRange {
779 node: Field(
780 "id",
781 ),
782 range: Some(
783 0..2,
784 ),
785 },
786 WithRange {
787 node: Empty,
788 range: Some(
789 2..2,
790 ),
791 },
792 ),
793 range: Some(
794 0..2,
795 ),
796 },
797 },
798 },
799 NamedSelection {
800 prefix: None,
801 path: PathSelection {
802 path: WithRange {
803 node: Key(
804 WithRange {
805 node: Field(
806 "title",
807 ),
808 range: Some(
809 3..8,
810 ),
811 },
812 WithRange {
813 node: Empty,
814 range: Some(
815 8..8,
816 ),
817 },
818 ),
819 range: Some(
820 3..8,
821 ),
822 },
823 },
824 },
825 NamedSelection {
826 prefix: None,
827 path: PathSelection {
828 path: WithRange {
829 node: Key(
830 WithRange {
831 node: Field(
832 "body",
833 ),
834 range: Some(
835 9..13,
836 ),
837 },
838 WithRange {
839 node: Empty,
840 range: Some(
841 13..13,
842 ),
843 },
844 ),
845 range: Some(
846 9..13,
847 ),
848 },
849 },
850 },
851 ],
852 range: Some(
853 0..13,
854 ),
855 },
856 ),
857 spec: V0_1,
858 },
859 config: None,
860 max_requests: None,
861 entity_resolver: None,
862 spec: V0_1,
863 schema_subtypes_map: {
864 "_Entity": {
865 "User",
866 },
867 },
868 request_headers: {},
869 response_headers: {},
870 request_variable_keys: {},
871 response_variable_keys: {},
872 batch_settings: None,
873 error_settings: ConnectorErrorsSettings {
874 message: None,
875 source_extensions: None,
876 connect_extensions: None,
877 connect_is_success: None,
878 },
879 label: Label(
880 "connectors.json http: GET /posts",
881 ),
882 },
883 ]
884 "###);
885 }
886
887 #[test]
888 fn test_from_schema_v0_2() {
889 let subgraphs = get_subgraphs(SIMPLE_SUPERGRAPH_V0_2);
890 let subgraph = subgraphs.get("connectors").unwrap();
891 let connectors = Connector::from_schema(subgraph.schema.schema(), "connectors").unwrap();
892 assert_debug_snapshot!(&connectors);
893 }
894}