uhg_custom_appollo_roouter/uplink/
schema_stream.rs

1// tonic does not derive `Eq` for the gRPC message types, which causes a warning from Clippy. The
2// current suggestion is to explicitly allow the lint in the module that imports the protos.
3// Read more: https://github.com/hyperium/tonic/issues/1056
4#![allow(clippy::derive_partial_eq_without_eq)]
5
6use graphql_client::GraphQLQuery;
7
8use super::schema::SchemaState;
9use crate::uplink::UplinkRequest;
10use crate::uplink::UplinkResponse;
11use crate::uplink::schema_stream::supergraph_sdl_query::FetchErrorCode;
12use crate::uplink::schema_stream::supergraph_sdl_query::SupergraphSdlQueryRouterConfig;
13
14#[derive(GraphQLQuery)]
15#[graphql(
16    query_path = "src/uplink/schema_query.graphql",
17    schema_path = "src/uplink/uplink.graphql",
18    request_derives = "Debug",
19    response_derives = "PartialEq, Debug, Deserialize",
20    deprecated = "warn"
21)]
22pub(crate) struct SupergraphSdlQuery;
23
24impl From<UplinkRequest> for supergraph_sdl_query::Variables {
25    fn from(req: UplinkRequest) -> Self {
26        supergraph_sdl_query::Variables {
27            api_key: req.api_key,
28            graph_ref: req.graph_ref,
29            if_after_id: req.id,
30        }
31    }
32}
33
34impl From<supergraph_sdl_query::ResponseData> for UplinkResponse<String> {
35    fn from(response: supergraph_sdl_query::ResponseData) -> Self {
36        match response.router_config {
37            SupergraphSdlQueryRouterConfig::RouterConfigResult(result) => UplinkResponse::New {
38                response: result.supergraph_sdl,
39                id: result.id,
40                // this will truncate the number of seconds to under u64::MAX, which should be
41                // a large enough delay anyway
42                delay: result.min_delay_seconds as u64,
43            },
44            SupergraphSdlQueryRouterConfig::Unchanged(response) => UplinkResponse::Unchanged {
45                id: Some(response.id),
46                delay: Some(response.min_delay_seconds as u64),
47            },
48            SupergraphSdlQueryRouterConfig::FetchError(err) => UplinkResponse::Error {
49                retry_later: err.code == FetchErrorCode::RETRY_LATER,
50                code: match err.code {
51                    FetchErrorCode::AUTHENTICATION_FAILED => "AUTHENTICATION_FAILED".to_string(),
52                    FetchErrorCode::ACCESS_DENIED => "ACCESS_DENIED".to_string(),
53                    FetchErrorCode::UNKNOWN_REF => "UNKNOWN_REF".to_string(),
54                    FetchErrorCode::RETRY_LATER => "RETRY_LATER".to_string(),
55                    FetchErrorCode::NOT_IMPLEMENTED_ON_THIS_INSTANCE => {
56                        "NOT_IMPLEMENTED_ON_THIS_INSTANCE".to_string()
57                    }
58                    FetchErrorCode::Other(other) => other,
59                },
60                message: err.message,
61            },
62        }
63    }
64}
65
66impl From<supergraph_sdl_query::ResponseData> for UplinkResponse<SchemaState> {
67    fn from(response: supergraph_sdl_query::ResponseData) -> Self {
68        match response.router_config {
69            SupergraphSdlQueryRouterConfig::RouterConfigResult(result) => UplinkResponse::New {
70                response: SchemaState {
71                    sdl: result.supergraph_sdl,
72                    launch_id: Some(result.id.clone()),
73                },
74                id: result.id,
75                // this will truncate the number of seconds to under u64::MAX, which should be
76                // a large enough delay anyway
77                delay: result.min_delay_seconds as u64,
78            },
79            SupergraphSdlQueryRouterConfig::Unchanged(response) => UplinkResponse::Unchanged {
80                id: Some(response.id),
81                delay: Some(response.min_delay_seconds as u64),
82            },
83            SupergraphSdlQueryRouterConfig::FetchError(err) => UplinkResponse::Error {
84                retry_later: err.code == FetchErrorCode::RETRY_LATER,
85                code: match err.code {
86                    FetchErrorCode::AUTHENTICATION_FAILED => "AUTHENTICATION_FAILED".to_string(),
87                    FetchErrorCode::ACCESS_DENIED => "ACCESS_DENIED".to_string(),
88                    FetchErrorCode::UNKNOWN_REF => "UNKNOWN_REF".to_string(),
89                    FetchErrorCode::RETRY_LATER => "RETRY_LATER".to_string(),
90                    FetchErrorCode::NOT_IMPLEMENTED_ON_THIS_INSTANCE => {
91                        "NOT_IMPLEMENTED_ON_THIS_INSTANCE".to_string()
92                    }
93                    FetchErrorCode::Other(other) => other,
94                },
95                message: err.message,
96            },
97        }
98    }
99}
100
101#[cfg(test)]
102mod test {
103    use std::str::FromStr;
104    use std::time::Duration;
105
106    use futures::stream::StreamExt;
107    use url::Url;
108
109    use crate::uplink::AWS_URL;
110    use crate::uplink::Endpoints;
111    use crate::uplink::GCP_URL;
112    use crate::uplink::UplinkConfig;
113    use crate::uplink::schema_stream::SupergraphSdlQuery;
114    use crate::uplink::stream_from_uplink;
115
116    #[tokio::test]
117    async fn integration_test() {
118        for url in &[GCP_URL, AWS_URL] {
119            if let (Ok(apollo_key), Ok(apollo_graph_ref)) = (
120                std::env::var("TEST_APOLLO_KEY"),
121                std::env::var("TEST_APOLLO_GRAPH_REF"),
122            ) {
123                let results = stream_from_uplink::<SupergraphSdlQuery, String>(UplinkConfig {
124                    apollo_key,
125                    apollo_graph_ref,
126                    endpoints: Some(Endpoints::fallback(vec![
127                        Url::from_str(url).expect("url must be valid"),
128                    ])),
129                    poll_interval: Duration::from_secs(1),
130                    timeout: Duration::from_secs(5),
131                })
132                .take(1)
133                .collect::<Vec<_>>()
134                .await;
135
136                let schema = results
137                    .first()
138                    .unwrap_or_else(|| panic!("expected one result from {}", url))
139                    .as_ref()
140                    .unwrap_or_else(|_| panic!("schema should be OK from {}", url));
141                assert!(schema.contains("type Product"))
142            }
143        }
144    }
145}