1mod headers;
2mod http_json_transport;
3mod keys;
4mod problem_location;
5mod source;
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use apollo_compiler::Name;
11use apollo_compiler::Schema;
12use apollo_compiler::collections::HashSet;
13use apollo_compiler::collections::IndexMap;
14use apollo_compiler::collections::IndexSet;
15use apollo_compiler::executable::FieldSet;
16use apollo_compiler::schema::ExtendedType;
17use apollo_compiler::validation::Valid;
18use keys::make_key_field_set_from_variables;
19use serde_json::Value;
20use shape::Shape;
21
22pub use self::headers::Header;
23pub(crate) use self::headers::HeaderParseError;
24pub use self::headers::HeaderSource;
25pub use self::headers::OriginatingDirective;
26pub use self::http_json_transport::HTTPMethod;
27pub use self::http_json_transport::HttpJsonTransport;
28pub use self::http_json_transport::MakeUriError;
29pub use self::problem_location::ProblemLocation;
30pub use self::source::SourceName;
31use super::ConnectId;
32use super::JSONSelection;
33use super::PathSelection;
34use super::id::ConnectorPosition;
35use super::json_selection::VarPaths;
36use super::spec::connect::ConnectBatchArguments;
37use super::spec::connect::ConnectDirectiveArguments;
38use super::spec::errors::ErrorsArguments;
39use super::spec::source::SourceDirectiveArguments;
40use super::variable::Namespace;
41use super::variable::VariableReference;
42use crate::connectors::ConnectSpec;
43use crate::connectors::spec::ConnectLink;
44use crate::connectors::spec::extract_connect_directive_arguments;
45use crate::connectors::spec::extract_source_directive_arguments;
46use crate::error::FederationError;
47use crate::error::SingleFederationError;
48
49#[derive(Debug, Clone)]
52pub struct Connector {
53 pub id: ConnectId,
54 pub transport: Option<HttpJsonTransport>,
56 pub selection: JSONSelection,
57 pub config: Option<CustomConfiguration>,
58 pub max_requests: Option<usize>,
59
60 pub entity_resolver: Option<EntityResolver>,
62 pub spec: ConnectSpec,
64
65 pub schema_subtypes_map: IndexMap<String, IndexSet<String>>,
67
68 pub request_headers: HashSet<String>,
70 pub response_headers: HashSet<String>,
72 pub request_variable_keys: IndexMap<Namespace, IndexSet<String>>,
74 pub response_variable_keys: IndexMap<Namespace, IndexSet<String>>,
75
76 pub batch_settings: Option<ConnectBatchArguments>,
77
78 pub error_settings: ConnectorErrorsSettings,
79
80 pub label: Label,
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct ConnectorErrorsSettings {
86 pub message: Option<JSONSelection>,
87 pub source_extensions: Option<JSONSelection>,
88 pub connect_extensions: Option<JSONSelection>,
89 pub connect_is_success: Option<JSONSelection>,
90}
91
92impl ConnectorErrorsSettings {
93 fn from_directive(
94 connect_errors: Option<&ErrorsArguments>,
95 source_errors: Option<&ErrorsArguments>,
96 connect_is_success: Option<&JSONSelection>,
97 ) -> Self {
98 let message = connect_errors
99 .and_then(|e| e.message.as_ref())
100 .or_else(|| source_errors.and_then(|e| e.message.as_ref()))
101 .cloned();
102 let source_extensions = source_errors.and_then(|e| e.extensions.as_ref()).cloned();
103 let connect_extensions = connect_errors.and_then(|e| e.extensions.as_ref()).cloned();
104 let connect_is_success = connect_is_success.cloned();
105 Self {
106 message,
107 source_extensions,
108 connect_extensions,
109 connect_is_success,
110 }
111 }
112
113 pub fn variable_references(&self) -> impl Iterator<Item = VariableReference<Namespace>> + '_ {
114 self.message
115 .as_ref()
116 .into_iter()
117 .flat_map(|m| m.variable_references())
118 .chain(
119 self.source_extensions
120 .as_ref()
121 .into_iter()
122 .flat_map(|m| m.variable_references()),
123 )
124 .chain(
125 self.connect_extensions
126 .as_ref()
127 .into_iter()
128 .flat_map(|m| m.variable_references()),
129 )
130 .chain(
131 self.connect_is_success
132 .as_ref()
133 .into_iter()
134 .flat_map(|m| m.variable_references()),
135 )
136 }
137
138 pub fn shape(&self) -> Option<Shape> {
143 let parts: Vec<Shape> = [
144 self.message.as_ref(),
145 self.source_extensions.as_ref(),
146 self.connect_extensions.as_ref(),
147 ]
148 .into_iter()
149 .flatten()
150 .map(|s| s.shape())
151 .collect();
152
153 match parts.len() {
154 0 => None,
155 1 => parts.into_iter().next(),
156 _ => Some(Shape::one(parts, [])),
157 }
158 }
159}
160
161pub type CustomConfiguration = Arc<HashMap<String, Value>>;
162
163#[derive(Debug, Clone, PartialEq, Eq)]
169pub enum EntityResolver {
170 Explicit,
172
173 Implicit,
175
176 TypeBatch,
178
179 TypeSingle,
181}
182
183impl Connector {
184 pub fn from_schema(schema: &Schema, subgraph_name: &str) -> Result<Vec<Self>, FederationError> {
191 let Some(link) = ConnectLink::new(schema) else {
192 return Ok(Default::default());
193 };
194 let link = link.map_err(|message| SingleFederationError::UnknownLinkVersion {
195 message: message.message,
196 })?;
197
198 let source_arguments =
199 extract_source_directive_arguments(schema, &link.source_directive_name)?;
200
201 let connect_arguments =
202 extract_connect_directive_arguments(schema, &link.connect_directive_name)?;
203
204 connect_arguments
205 .into_iter()
206 .map(|args| {
207 Self::from_directives(schema, subgraph_name, link.spec, args, &source_arguments)
208 })
209 .collect::<Result<Vec<_>, _>>()
210 }
211
212 fn from_directives(
213 schema: &Schema,
214 subgraph_name: &str,
215 spec: ConnectSpec,
216 connect: ConnectDirectiveArguments,
217 source_arguments: &[SourceDirectiveArguments],
218 ) -> Result<Self, FederationError> {
219 let source = connect
220 .source
221 .and_then(|name| source_arguments.iter().find(|s| s.name == name));
222 let source_name = source.map(|s| s.name.clone());
223
224 let source_http = source.map(|s| &s.http);
226 let transport = connect
227 .http
228 .map(|connect_http| HttpJsonTransport::from_directive(connect_http, source_http, spec))
229 .transpose()?;
230
231 let batch_settings = connect.batch;
233 let connect_errors = connect.errors.as_ref();
234 let source_errors = source.and_then(|s| s.errors.as_ref());
235 let is_success = connect
237 .is_success
238 .as_ref()
239 .or_else(|| source.and_then(|s| s.is_success.as_ref()));
240
241 let error_settings =
242 ConnectorErrorsSettings::from_directive(connect_errors, source_errors, is_success);
243
244 let request_references: IndexSet<VariableReference<Namespace>> = transport
246 .as_ref()
247 .map(|t| t.variable_references().collect())
248 .unwrap_or_default();
249
250 let response_references: IndexSet<VariableReference<Namespace>> = connect
252 .selection
253 .variable_references()
254 .chain(error_settings.variable_references())
255 .collect();
256
257 let request_variable_keys = extract_variable_key_references(request_references.iter());
260 let response_variable_keys = extract_variable_key_references(response_references.iter());
261
262 let request_headers = extract_header_references(&request_references); let response_headers = extract_header_references(&response_references); let entity_resolver = determine_entity_resolver(
268 &connect.position,
269 connect.entity,
270 schema,
271 &request_variable_keys,
272 );
273 let label = Label::new(
274 subgraph_name,
275 source_name.as_ref(),
276 transport.as_ref(),
277 entity_resolver.as_ref(),
278 );
279 let id = ConnectId {
280 subgraph_name: subgraph_name.to_string(),
281 source_name,
282 named: connect.connector_id,
283 directive: connect.position,
284 };
285
286 Ok(Connector {
287 id,
288 transport,
289 selection: connect.selection,
290 entity_resolver,
291 config: None,
292 max_requests: None,
293 spec,
294 schema_subtypes_map: Connector::subtypes_map_from_schema(schema),
295 request_headers,
296 response_headers,
297 request_variable_keys,
298 response_variable_keys,
299 batch_settings,
300 error_settings,
301 label,
302 })
303 }
304
305 pub fn subtypes_map_from_schema(schema: &Schema) -> IndexMap<String, IndexSet<String>> {
306 let mut subtypes_map: IndexMap<String, IndexSet<String>> = IndexMap::default();
307
308 for (name, ty) in schema.types.iter() {
310 match ty {
311 ExtendedType::Object(o) => {
312 for supertype in &o.implements_interfaces {
313 subtypes_map
314 .entry(supertype.to_string())
315 .or_default()
316 .insert(name.to_string());
317 }
318 }
319 ExtendedType::Interface(i) => {
320 for supertype in &i.implements_interfaces {
321 subtypes_map
322 .entry(supertype.to_string())
323 .or_default()
324 .insert(name.to_string());
325 }
326 }
327 ExtendedType::Union(u) => {
328 for member in &u.members {
329 subtypes_map
330 .entry(u.name.to_string())
331 .or_default()
332 .insert(member.to_string());
333 }
334 }
335 _ => {
336 }
338 }
339 }
340
341 subtypes_map
342 }
343
344 pub(crate) fn variable_references(&self) -> impl Iterator<Item = VariableReference<Namespace>> {
345 let transport_refs = self
346 .transport
347 .as_ref()
348 .into_iter()
349 .flat_map(|t| t.variable_references());
350 let selection_refs = self
351 .selection
352 .external_var_paths()
353 .into_iter()
354 .flat_map(PathSelection::variable_reference);
355 transport_refs.chain(selection_refs)
356 }
357
358 pub fn resolvable_key(&self, schema: &Schema) -> Result<Option<Valid<FieldSet>>, String> {
360 match &self.entity_resolver {
361 None => Ok(None),
362 Some(EntityResolver::Explicit) => {
363 make_key_field_set_from_variables(
364 schema,
365 &self.id.directive.base_type_name(schema).ok_or_else(|| {
366 format!("Missing field {}", self.id.directive.coordinate())
367 })?,
368 self.variable_references(),
369 Namespace::Args,
370 )
371 }
372 Some(EntityResolver::Implicit) => {
373 make_key_field_set_from_variables(
374 schema,
375 &self.id.directive.parent_type_name().ok_or_else(|| {
376 format!("Missing type {}", self.id.directive.coordinate())
377 })?,
378 self.variable_references(),
379 Namespace::This,
380 )
381 }
382 Some(EntityResolver::TypeBatch) => {
383 make_key_field_set_from_variables(
384 schema,
385 &self.id.directive.base_type_name(schema).ok_or_else(|| {
386 format!("Missing type {}", self.id.directive.coordinate())
387 })?,
388 self.variable_references(),
389 Namespace::Batch,
390 )
391 }
392 Some(EntityResolver::TypeSingle) => {
393 make_key_field_set_from_variables(
394 schema,
395 &self.id.directive.base_type_name(schema).ok_or_else(|| {
396 format!("Missing type {}", self.id.directive.coordinate())
397 })?,
398 self.variable_references(),
399 Namespace::This,
400 )
401 }
402 }
403 .map_err(|_| {
404 format!(
405 "Failed to create key for connector {}",
406 self.id.coordinate()
407 )
408 })
409 }
410
411 pub fn source_config_key(&self) -> String {
415 if let Some(source_name) = &self.id.source_name {
416 format!("{}.{}", self.id.subgraph_name, source_name)
417 } else {
418 format!("{}.{}", self.id.subgraph_name, self.id.synthetic_name())
419 }
420 }
421
422 pub fn name(&self) -> Name {
426 match &self.id.directive {
427 ConnectorPosition::Field(field_position) => field_position.directive_name.clone(),
428 ConnectorPosition::Type(type_position) => type_position.directive_name.clone(),
429 }
430 }
431
432 pub fn id(&self) -> String {
434 self.id.name()
435 }
436
437 pub fn abstract_types(&self) -> IndexSet<String> {
439 self.schema_subtypes_map.keys().cloned().collect()
440 }
441
442 pub fn output_shape(&self) -> Shape {
451 let selection_shape = self.selection.shape();
452 match self.error_settings.shape() {
453 Some(error_shape) => Shape::one([selection_shape, error_shape], []),
454 None => selection_shape,
455 }
456 }
457}
458
459#[derive(Debug, Clone)]
461pub struct Label(pub String);
462
463impl Label {
464 fn new(
465 subgraph_name: &str,
466 source: Option<&SourceName>,
467 transport: Option<&HttpJsonTransport>,
468 entity_resolver: Option<&EntityResolver>,
469 ) -> Self {
470 let source = source.map(SourceName::as_str).unwrap_or_default();
471 let batch = match entity_resolver {
472 Some(EntityResolver::TypeBatch) => "[BATCH] ",
473 _ => "",
474 };
475 let transport_label = transport
476 .map(|t| t.label())
477 .unwrap_or_else(|| "mappingOnly".to_string());
478 Self(format!("{batch}{subgraph_name}.{source} {transport_label}"))
479 }
480}
481
482impl From<&str> for Label {
483 fn from(label: &str) -> Self {
484 Self(label.to_string())
485 }
486}
487
488impl AsRef<str> for Label {
489 fn as_ref(&self) -> &str {
490 &self.0
491 }
492}
493
494fn determine_entity_resolver(
495 position: &ConnectorPosition,
496 entity: bool,
497 schema: &Schema,
498 request_variables: &IndexMap<Namespace, IndexSet<String>>,
499) -> Option<EntityResolver> {
500 match position {
501 ConnectorPosition::Field(_) => {
502 match (entity, position.on_root_type(schema)) {
503 (true, _) => Some(EntityResolver::Explicit), (_, false) => Some(EntityResolver::Implicit), _ => None,
506 }
507 }
508 ConnectorPosition::Type(_) => {
509 if request_variables.contains_key(&Namespace::Batch) {
510 Some(EntityResolver::TypeBatch) } else {
512 Some(EntityResolver::TypeSingle) }
514 }
515 }
516}
517
518fn extract_header_references(
520 variable_references: &IndexSet<VariableReference<Namespace>>,
521) -> HashSet<String> {
522 variable_references
523 .iter()
524 .flat_map(|var_ref| {
525 if var_ref.namespace.namespace != Namespace::Request
526 && var_ref.namespace.namespace != Namespace::Response
527 {
528 Vec::new()
529 } else {
530 var_ref
531 .selection
532 .get("headers")
533 .map(|headers_subtrie| headers_subtrie.keys().cloned().collect())
534 .unwrap_or_default()
535 }
536 })
537 .collect()
538}
539
540fn extract_variable_key_references<'a>(
543 references: impl Iterator<Item = &'a VariableReference<Namespace>>,
544) -> IndexMap<Namespace, IndexSet<String>> {
545 let mut variable_keys: IndexMap<Namespace, IndexSet<String>> = IndexMap::default();
546
547 for var_ref in references {
548 let set = variable_keys
550 .entry(var_ref.namespace.namespace)
551 .or_default();
552
553 for key in var_ref.selection.keys() {
554 set.insert(key.to_string());
555 }
556 }
557
558 variable_keys
559}
560
561#[cfg(test)]
562mod tests {
563 use apollo_compiler::Schema;
564 use insta::assert_debug_snapshot;
565
566 use super::*;
567 use crate::ValidFederationSubgraphs;
568 use crate::schema::FederationSchema;
569 use crate::supergraph::extract_subgraphs_from_supergraph;
570
571 static SIMPLE_SUPERGRAPH: &str = include_str!("./tests/schemas/simple.graphql");
572 static SIMPLE_SUPERGRAPH_V0_2: &str = include_str!("./tests/schemas/simple_v0_2.graphql");
573
574 fn get_subgraphs(supergraph_sdl: &str) -> ValidFederationSubgraphs {
575 let schema = Schema::parse(supergraph_sdl, "supergraph.graphql").unwrap();
576 let supergraph_schema = FederationSchema::new(schema).unwrap();
577 extract_subgraphs_from_supergraph(&supergraph_schema, Some(true)).unwrap()
578 }
579
580 #[test]
581 fn test_from_schema() {
582 let subgraphs = get_subgraphs(SIMPLE_SUPERGRAPH);
583 let subgraph = subgraphs.get("connectors").unwrap();
584 let connectors = Connector::from_schema(subgraph.schema.schema(), "connectors").unwrap();
585 assert_debug_snapshot!(&connectors, @r#"
586 [
587 Connector {
588 id: ConnectId {
589 subgraph_name: "connectors",
590 source_name: Some(
591 "json",
592 ),
593 named: None,
594 directive: Field(
595 ObjectOrInterfaceFieldDirectivePosition {
596 field: Object(Query.users),
597 directive_name: "connect",
598 directive_index: 0,
599 },
600 ),
601 },
602 transport: Some(
603 HttpJsonTransport {
604 source_template: Some(
605 StringTemplate {
606 parts: [
607 Constant(
608 Constant {
609 value: "https://jsonplaceholder.typicode.com/",
610 location: 0..37,
611 },
612 ),
613 ],
614 },
615 ),
616 connect_template: StringTemplate {
617 parts: [
618 Constant(
619 Constant {
620 value: "/users",
621 location: 0..6,
622 },
623 ),
624 ],
625 },
626 method: Get,
627 headers: [
628 Header {
629 name: "authtoken",
630 source: From(
631 "x-auth-token",
632 ),
633 },
634 Header {
635 name: "user-agent",
636 source: Value(
637 HeaderValue(
638 StringTemplate {
639 parts: [
640 Constant(
641 Constant {
642 value: "Firefox",
643 location: 0..7,
644 },
645 ),
646 ],
647 },
648 ),
649 ),
650 },
651 ],
652 body: None,
653 source_path: None,
654 source_query_params: None,
655 connect_path: None,
656 connect_query_params: None,
657 },
658 ),
659 selection: JSONSelection {
660 inner: Named(
661 SubSelection {
662 selections: [
663 NamedSelection {
664 prefix: None,
665 path: WithRange {
666 node: Path(
667 PathSelection {
668 path: WithRange {
669 node: Key(
670 WithRange {
671 node: Field(
672 "id",
673 ),
674 range: Some(
675 0..2,
676 ),
677 },
678 WithRange {
679 node: Empty,
680 range: Some(
681 2..2,
682 ),
683 },
684 ),
685 range: Some(
686 0..2,
687 ),
688 },
689 },
690 ),
691 range: Some(
692 0..2,
693 ),
694 },
695 },
696 NamedSelection {
697 prefix: None,
698 path: WithRange {
699 node: Path(
700 PathSelection {
701 path: WithRange {
702 node: Key(
703 WithRange {
704 node: Field(
705 "name",
706 ),
707 range: Some(
708 3..7,
709 ),
710 },
711 WithRange {
712 node: Empty,
713 range: Some(
714 7..7,
715 ),
716 },
717 ),
718 range: Some(
719 3..7,
720 ),
721 },
722 },
723 ),
724 range: Some(
725 3..7,
726 ),
727 },
728 },
729 ],
730 range: Some(
731 0..7,
732 ),
733 },
734 ),
735 spec: V0_1,
736 },
737 config: None,
738 max_requests: None,
739 entity_resolver: None,
740 spec: V0_1,
741 schema_subtypes_map: {
742 "_Entity": {
743 "User",
744 },
745 },
746 request_headers: {},
747 response_headers: {},
748 request_variable_keys: {},
749 response_variable_keys: {},
750 batch_settings: None,
751 error_settings: ConnectorErrorsSettings {
752 message: None,
753 source_extensions: None,
754 connect_extensions: None,
755 connect_is_success: None,
756 },
757 label: Label(
758 "connectors.json http: GET /users",
759 ),
760 },
761 Connector {
762 id: ConnectId {
763 subgraph_name: "connectors",
764 source_name: Some(
765 "json",
766 ),
767 named: None,
768 directive: Field(
769 ObjectOrInterfaceFieldDirectivePosition {
770 field: Object(Query.posts),
771 directive_name: "connect",
772 directive_index: 0,
773 },
774 ),
775 },
776 transport: Some(
777 HttpJsonTransport {
778 source_template: Some(
779 StringTemplate {
780 parts: [
781 Constant(
782 Constant {
783 value: "https://jsonplaceholder.typicode.com/",
784 location: 0..37,
785 },
786 ),
787 ],
788 },
789 ),
790 connect_template: StringTemplate {
791 parts: [
792 Constant(
793 Constant {
794 value: "/posts",
795 location: 0..6,
796 },
797 ),
798 ],
799 },
800 method: Get,
801 headers: [
802 Header {
803 name: "authtoken",
804 source: From(
805 "x-auth-token",
806 ),
807 },
808 Header {
809 name: "user-agent",
810 source: Value(
811 HeaderValue(
812 StringTemplate {
813 parts: [
814 Constant(
815 Constant {
816 value: "Firefox",
817 location: 0..7,
818 },
819 ),
820 ],
821 },
822 ),
823 ),
824 },
825 ],
826 body: None,
827 source_path: None,
828 source_query_params: None,
829 connect_path: None,
830 connect_query_params: None,
831 },
832 ),
833 selection: JSONSelection {
834 inner: Named(
835 SubSelection {
836 selections: [
837 NamedSelection {
838 prefix: None,
839 path: WithRange {
840 node: Path(
841 PathSelection {
842 path: WithRange {
843 node: Key(
844 WithRange {
845 node: Field(
846 "id",
847 ),
848 range: Some(
849 0..2,
850 ),
851 },
852 WithRange {
853 node: Empty,
854 range: Some(
855 2..2,
856 ),
857 },
858 ),
859 range: Some(
860 0..2,
861 ),
862 },
863 },
864 ),
865 range: Some(
866 0..2,
867 ),
868 },
869 },
870 NamedSelection {
871 prefix: None,
872 path: WithRange {
873 node: Path(
874 PathSelection {
875 path: WithRange {
876 node: Key(
877 WithRange {
878 node: Field(
879 "title",
880 ),
881 range: Some(
882 3..8,
883 ),
884 },
885 WithRange {
886 node: Empty,
887 range: Some(
888 8..8,
889 ),
890 },
891 ),
892 range: Some(
893 3..8,
894 ),
895 },
896 },
897 ),
898 range: Some(
899 3..8,
900 ),
901 },
902 },
903 NamedSelection {
904 prefix: None,
905 path: WithRange {
906 node: Path(
907 PathSelection {
908 path: WithRange {
909 node: Key(
910 WithRange {
911 node: Field(
912 "body",
913 ),
914 range: Some(
915 9..13,
916 ),
917 },
918 WithRange {
919 node: Empty,
920 range: Some(
921 13..13,
922 ),
923 },
924 ),
925 range: Some(
926 9..13,
927 ),
928 },
929 },
930 ),
931 range: Some(
932 9..13,
933 ),
934 },
935 },
936 ],
937 range: Some(
938 0..13,
939 ),
940 },
941 ),
942 spec: V0_1,
943 },
944 config: None,
945 max_requests: None,
946 entity_resolver: None,
947 spec: V0_1,
948 schema_subtypes_map: {
949 "_Entity": {
950 "User",
951 },
952 },
953 request_headers: {},
954 response_headers: {},
955 request_variable_keys: {},
956 response_variable_keys: {},
957 batch_settings: None,
958 error_settings: ConnectorErrorsSettings {
959 message: None,
960 source_extensions: None,
961 connect_extensions: None,
962 connect_is_success: None,
963 },
964 label: Label(
965 "connectors.json http: GET /posts",
966 ),
967 },
968 ]
969 "#);
970 }
971
972 #[test]
973 fn test_from_schema_v0_2() {
974 let subgraphs = get_subgraphs(SIMPLE_SUPERGRAPH_V0_2);
975 let subgraph = subgraphs.get("connectors").unwrap();
976 let connectors = Connector::from_schema(subgraph.schema.schema(), "connectors").unwrap();
977 assert_debug_snapshot!(&connectors);
978 }
979
980 mod output_shape {
985 use shape::ShapeCase;
986
987 use super::*;
988 use crate::connectors::JSONSelection;
989
990 #[test]
991 fn errors_settings_shape_is_none_when_no_mappings_configured() {
992 let settings = ConnectorErrorsSettings::default();
993 assert!(
994 settings.shape().is_none(),
995 "expected None when no error mappings are set"
996 );
997 }
998
999 #[test]
1000 fn errors_settings_shape_unions_all_configured_mappings() {
1001 let settings = ConnectorErrorsSettings {
1002 message: Some(JSONSelection::parse("error.message").unwrap()),
1003 source_extensions: Some(JSONSelection::parse("source_code: error.code").unwrap()),
1004 connect_extensions: Some(JSONSelection::parse("code: error.code").unwrap()),
1005 connect_is_success: Some(JSONSelection::parse("$status->eq(200)").unwrap()),
1006 };
1007 let shape = settings
1008 .shape()
1009 .expect("expected Some shape when message + extensions are configured");
1010 assert!(
1011 matches!(shape.case(), ShapeCase::One(_)),
1012 "expected ShapeCase::One union of message + source/connect extensions, got: {shape:?}"
1013 );
1014 }
1015
1016 #[test]
1017 fn connector_output_shape_unions_selection_with_errors() {
1018 let subgraphs = get_subgraphs(SIMPLE_SUPERGRAPH);
1019 let subgraph = subgraphs.get("connectors").unwrap();
1020 let mut connectors =
1021 Connector::from_schema(subgraph.schema.schema(), "connectors").unwrap();
1022 let mut connector = connectors.remove(0);
1023
1024 connector.error_settings = ConnectorErrorsSettings {
1026 message: Some(JSONSelection::parse("error.message").unwrap()),
1027 source_extensions: None,
1028 connect_extensions: Some(JSONSelection::parse("code: error.code").unwrap()),
1029 connect_is_success: Some(JSONSelection::parse("$status->eq(200)").unwrap()),
1030 };
1031
1032 let combined = connector.output_shape();
1033 assert!(
1034 matches!(combined.case(), ShapeCase::One(_)),
1035 "expected ShapeCase::One union of selection + errors, got: {combined:?}"
1036 );
1037 }
1038
1039 #[test]
1040 fn connector_output_shape_with_no_errors_equals_selection_shape() {
1041 let subgraphs = get_subgraphs(SIMPLE_SUPERGRAPH);
1042 let subgraph = subgraphs.get("connectors").unwrap();
1043 let mut connectors =
1044 Connector::from_schema(subgraph.schema.schema(), "connectors").unwrap();
1045 let connector = connectors.remove(0);
1046
1047 assert_eq!(
1050 connector.output_shape(),
1051 connector.selection.shape(),
1052 "with no error mappings, output_shape should match selection.shape"
1053 );
1054 }
1055 }
1056}