uhg_custom_appollo_roouter/uplink/
schema_stream.rs1#![allow(clippy::derive_partial_eq_without_eq)]
5
6use graphql_client::GraphQLQuery;
7
8use super::schema::SchemaState;
9use crate::uplink::UplinkRequest;
10use crate::uplink::UplinkResponse;
11use crate::uplink::schema_stream::supergraph_sdl_query::FetchErrorCode;
12use crate::uplink::schema_stream::supergraph_sdl_query::SupergraphSdlQueryRouterConfig;
13
14#[derive(GraphQLQuery)]
15#[graphql(
16 query_path = "src/uplink/schema_query.graphql",
17 schema_path = "src/uplink/uplink.graphql",
18 request_derives = "Debug",
19 response_derives = "PartialEq, Debug, Deserialize",
20 deprecated = "warn"
21)]
22pub(crate) struct SupergraphSdlQuery;
23
24impl From<UplinkRequest> for supergraph_sdl_query::Variables {
25 fn from(req: UplinkRequest) -> Self {
26 supergraph_sdl_query::Variables {
27 api_key: req.api_key,
28 graph_ref: req.graph_ref,
29 if_after_id: req.id,
30 }
31 }
32}
33
34impl From<supergraph_sdl_query::ResponseData> for UplinkResponse<String> {
35 fn from(response: supergraph_sdl_query::ResponseData) -> Self {
36 match response.router_config {
37 SupergraphSdlQueryRouterConfig::RouterConfigResult(result) => UplinkResponse::New {
38 response: result.supergraph_sdl,
39 id: result.id,
40 delay: result.min_delay_seconds as u64,
43 },
44 SupergraphSdlQueryRouterConfig::Unchanged(response) => UplinkResponse::Unchanged {
45 id: Some(response.id),
46 delay: Some(response.min_delay_seconds as u64),
47 },
48 SupergraphSdlQueryRouterConfig::FetchError(err) => UplinkResponse::Error {
49 retry_later: err.code == FetchErrorCode::RETRY_LATER,
50 code: match err.code {
51 FetchErrorCode::AUTHENTICATION_FAILED => "AUTHENTICATION_FAILED".to_string(),
52 FetchErrorCode::ACCESS_DENIED => "ACCESS_DENIED".to_string(),
53 FetchErrorCode::UNKNOWN_REF => "UNKNOWN_REF".to_string(),
54 FetchErrorCode::RETRY_LATER => "RETRY_LATER".to_string(),
55 FetchErrorCode::NOT_IMPLEMENTED_ON_THIS_INSTANCE => {
56 "NOT_IMPLEMENTED_ON_THIS_INSTANCE".to_string()
57 }
58 FetchErrorCode::Other(other) => other,
59 },
60 message: err.message,
61 },
62 }
63 }
64}
65
66impl From<supergraph_sdl_query::ResponseData> for UplinkResponse<SchemaState> {
67 fn from(response: supergraph_sdl_query::ResponseData) -> Self {
68 match response.router_config {
69 SupergraphSdlQueryRouterConfig::RouterConfigResult(result) => UplinkResponse::New {
70 response: SchemaState {
71 sdl: result.supergraph_sdl,
72 launch_id: Some(result.id.clone()),
73 },
74 id: result.id,
75 delay: result.min_delay_seconds as u64,
78 },
79 SupergraphSdlQueryRouterConfig::Unchanged(response) => UplinkResponse::Unchanged {
80 id: Some(response.id),
81 delay: Some(response.min_delay_seconds as u64),
82 },
83 SupergraphSdlQueryRouterConfig::FetchError(err) => UplinkResponse::Error {
84 retry_later: err.code == FetchErrorCode::RETRY_LATER,
85 code: match err.code {
86 FetchErrorCode::AUTHENTICATION_FAILED => "AUTHENTICATION_FAILED".to_string(),
87 FetchErrorCode::ACCESS_DENIED => "ACCESS_DENIED".to_string(),
88 FetchErrorCode::UNKNOWN_REF => "UNKNOWN_REF".to_string(),
89 FetchErrorCode::RETRY_LATER => "RETRY_LATER".to_string(),
90 FetchErrorCode::NOT_IMPLEMENTED_ON_THIS_INSTANCE => {
91 "NOT_IMPLEMENTED_ON_THIS_INSTANCE".to_string()
92 }
93 FetchErrorCode::Other(other) => other,
94 },
95 message: err.message,
96 },
97 }
98 }
99}
100
101#[cfg(test)]
102mod test {
103 use std::str::FromStr;
104 use std::time::Duration;
105
106 use futures::stream::StreamExt;
107 use url::Url;
108
109 use crate::uplink::AWS_URL;
110 use crate::uplink::Endpoints;
111 use crate::uplink::GCP_URL;
112 use crate::uplink::UplinkConfig;
113 use crate::uplink::schema_stream::SupergraphSdlQuery;
114 use crate::uplink::stream_from_uplink;
115
116 #[tokio::test]
117 async fn integration_test() {
118 for url in &[GCP_URL, AWS_URL] {
119 if let (Ok(apollo_key), Ok(apollo_graph_ref)) = (
120 std::env::var("TEST_APOLLO_KEY"),
121 std::env::var("TEST_APOLLO_GRAPH_REF"),
122 ) {
123 let results = stream_from_uplink::<SupergraphSdlQuery, String>(UplinkConfig {
124 apollo_key,
125 apollo_graph_ref,
126 endpoints: Some(Endpoints::fallback(vec![
127 Url::from_str(url).expect("url must be valid"),
128 ])),
129 poll_interval: Duration::from_secs(1),
130 timeout: Duration::from_secs(5),
131 })
132 .take(1)
133 .collect::<Vec<_>>()
134 .await;
135
136 let schema = results
137 .first()
138 .unwrap_or_else(|| panic!("expected one result from {}", url))
139 .as_ref()
140 .unwrap_or_else(|_| panic!("schema should be OK from {}", url));
141 assert!(schema.contains("type Product"))
142 }
143 }
144 }
145}