1use crate::configuration::subgraph::SubGraphConfig;
2use async_graphql::{
3 dynamic::Schema,
4 http::{playground_source, GraphQLPlaygroundConfig},
5};
6use async_graphql_warp::{GraphQLBadRequest, GraphQLResponse};
7use http::{HeaderMap, StatusCode};
8use local_ip_address::local_ip;
9use log::{info, trace};
10use std::convert::Infallible;
11use warp::{http::Response as HttpResponse, Filter, Future, Rejection};
12
13pub mod cli_args;
14pub mod configuration;
15pub mod data_sources;
16pub mod filter_operator;
17pub mod graphql;
18pub mod resolver_type;
19pub mod scalar_option;
20pub mod sql_value;
21pub mod traits;
22pub mod utils;
23
24pub async fn run(
26 args: cli_args::CliArgs,
27 subgraph_config: SubGraphConfig,
28) -> Result<
29 (
30 impl Future<Output = ()>,
31 Schema,
32 tokio::sync::oneshot::Sender<()>,
33 ),
34 std::io::Error,
35> {
36 info!("⛵ Starting Subgraph Service");
37 trace!("Service Arguments: {:?}", args);
38
39 let data_sources = data_sources::DataSources::init(
41 subgraph_config.service.data_sources.clone(),
42 &args,
43 &subgraph_config,
44 )
45 .await;
46
47 let schema = graphql::schema::ServiceSchema::new(subgraph_config.clone(), data_sources).build();
49
50 let graphql_post = warp::path("graphql")
52 .and(async_graphql_warp::graphql(schema.clone()))
53 .and(warp::header::headers_cloned())
54 .and_then(
55 |(schema, request): (Schema, async_graphql::Request), headers: HeaderMap| async move {
56 trace!("Request: {:?}", request);
57 let dynamic_response = schema.execute(request.data(headers)).await;
58 let response = GraphQLResponse::from(dynamic_response);
59 Ok::<_, Infallible>(response)
60 },
61 );
62
63 let graphql_playground = warp::path("playground").and(warp::get()).map(|| {
65 HttpResponse::builder().body(playground_source(GraphQLPlaygroundConfig::new("/graphql")))
66 });
67
68 let cors = configuration::cors_config::CorsConfig::create_cors(subgraph_config.clone());
70
71 let routes =
73 graphql_playground
74 .or(graphql_post)
75 .with(cors)
76 .recover(|err: Rejection| async move {
77 if let Some(GraphQLBadRequest(err)) = err.find() {
78 return Ok::<_, Infallible>(warp::reply::with_status(
79 err.to_string(),
80 StatusCode::BAD_REQUEST,
81 ));
82 }
83
84 Ok(warp::reply::with_status(
85 "INTERNAL_SERVER_ERROR".to_string(),
86 StatusCode::INTERNAL_SERVER_ERROR,
87 ))
88 });
89
90 let port = match args.port.clone() {
92 Some(port) => port,
93 None => match subgraph_config.service.port.clone() {
94 Some(port) => port,
95 None => 0,
96 },
97 };
98
99 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
101
102 let host = match args.host.clone() {
104 true => {
105 let ip = local_ip().expect("Failed to get local IP address");
106 info!("🛝 Playground: http://{:?}:{:?}", ip, port);
107 [0, 0, 0, 0]
108 }
109 false => match subgraph_config.service.host {
110 Some(_host) => {
111 let ip = local_ip().expect("Failed to get local IP address");
112 info!("🛝 Playground: http://{:?}:{:?}", ip, port);
113 [0, 0, 0, 0]
114 }
115 None => {
116 info!("🛝 Playground: http://localhost:{:?}", port);
117 [127, 0, 0, 1]
118 }
119 },
120 };
121
122 let (_addr, server) = warp::serve(routes).bind_with_graceful_shutdown((host, port), async {
124 rx.await.ok();
125 });
126
127 info!("❇️ Subgraph Service Started");
128
129 Ok((server, schema, tx))
130}