subgraph/
lib.rs

1use crate::configuration::subgraph::SubGraphConfig;
2use async_graphql::{
3    dynamic::Schema,
4    http::{playground_source, GraphQLPlaygroundConfig},
5};
6use async_graphql_warp::{GraphQLBadRequest, GraphQLResponse};
7use http::{HeaderMap, StatusCode};
8use local_ip_address::local_ip;
9use log::{info, trace};
10use std::convert::Infallible;
11use warp::{http::Response as HttpResponse, Filter, Future, Rejection};
12
13pub mod cli_args;
14pub mod configuration;
15pub mod data_sources;
16pub mod filter_operator;
17pub mod graphql;
18pub mod resolver_type;
19pub mod scalar_option;
20pub mod sql_value;
21pub mod traits;
22pub mod utils;
23
24/// Starts the Subgraph Service. Initializes the DataSources and builds the GraphQL Schema.
25pub async fn run(
26    args: cli_args::CliArgs,
27    subgraph_config: SubGraphConfig,
28) -> Result<
29    (
30        impl Future<Output = ()>,
31        Schema,
32        tokio::sync::oneshot::Sender<()>,
33    ),
34    std::io::Error,
35> {
36    info!("⛵ Starting Subgraph Service");
37    trace!("Service Arguments: {:?}", args);
38
39    // Initialize DataSources
40    let data_sources = data_sources::DataSources::init(
41        subgraph_config.service.data_sources.clone(),
42        &args,
43        &subgraph_config,
44    )
45    .await;
46
47    // Build GraphQL Schema
48    let schema = graphql::schema::ServiceSchema::new(subgraph_config.clone(), data_sources).build();
49
50    // GraphQL Endpoint at /graphql
51    let graphql_post = warp::path("graphql")
52        .and(async_graphql_warp::graphql(schema.clone()))
53        .and(warp::header::headers_cloned())
54        .and_then(
55            |(schema, request): (Schema, async_graphql::Request), headers: HeaderMap| async move {
56                trace!("Request: {:?}", request);
57                let dynamic_response = schema.execute(request.data(headers)).await;
58                let response = GraphQLResponse::from(dynamic_response);
59                Ok::<_, Infallible>(response)
60            },
61        );
62
63    // GraphQL Playground Endpoint
64    let graphql_playground = warp::path("playground").and(warp::get()).map(|| {
65        HttpResponse::builder().body(playground_source(GraphQLPlaygroundConfig::new("/graphql")))
66    });
67
68    // CORS Config
69    let cors = configuration::cors_config::CorsConfig::create_cors(subgraph_config.clone());
70
71    // Routes - Combine GraphQL and GraphQL Playground
72    let routes =
73        graphql_playground
74            .or(graphql_post)
75            .with(cors)
76            .recover(|err: Rejection| async move {
77                if let Some(GraphQLBadRequest(err)) = err.find() {
78                    return Ok::<_, Infallible>(warp::reply::with_status(
79                        err.to_string(),
80                        StatusCode::BAD_REQUEST,
81                    ));
82                }
83
84                Ok(warp::reply::with_status(
85                    "INTERNAL_SERVER_ERROR".to_string(),
86                    StatusCode::INTERNAL_SERVER_ERROR,
87                ))
88            });
89
90    // Get Port from CLI Arguments or Subgraph Config
91    let port = match args.port.clone() {
92        Some(port) => port,
93        None => match subgraph_config.service.port.clone() {
94            Some(port) => port,
95            None => 0,
96        },
97    };
98
99    // Create Graceful Shutdown Channel
100    let (tx, rx) = tokio::sync::oneshot::channel::<()>();
101
102    // If host is true, bind to 0.0.0.0
103    let host = match args.host.clone() {
104        true => {
105            let ip = local_ip().expect("Failed to get local IP address");
106            info!("🛝 Playground: http://{:?}:{:?}", ip, port);
107            [0, 0, 0, 0]
108        }
109        false => match subgraph_config.service.host {
110            Some(_host) => {
111                let ip = local_ip().expect("Failed to get local IP address");
112                info!("🛝 Playground: http://{:?}:{:?}", ip, port);
113                [0, 0, 0, 0]
114            }
115            None => {
116                info!("🛝 Playground: http://localhost:{:?}", port);
117                [127, 0, 0, 1]
118            }
119        },
120    };
121
122    // Return Server, Schema and Graceful Shutdown Channel
123    let (_addr, server) = warp::serve(routes).bind_with_graceful_shutdown((host, port), async {
124        rx.await.ok();
125    });
126
127    info!("❇️  Subgraph Service Started");
128
129    Ok((server, schema, tx))
130}