use crate::configuration::subgraph::SubGraphConfig;
use async_graphql::{
dynamic::Schema,
http::{playground_source, GraphQLPlaygroundConfig},
};
use async_graphql_warp::{GraphQLBadRequest, GraphQLResponse};
use http::{HeaderMap, StatusCode};
use local_ip_address::local_ip;
use log::{info, trace};
use std::convert::Infallible;
use warp::{http::Response as HttpResponse, Filter, Future, Rejection};
pub mod cli_args;
pub mod configuration;
pub mod data_sources;
pub mod filter_operator;
pub mod graphql;
pub mod resolver_type;
pub mod scalar_option;
pub mod sql_value;
pub mod traits;
pub mod utils;
pub async fn run(
args: cli_args::CliArgs,
subgraph_config: SubGraphConfig,
) -> Result<
(
impl Future<Output = ()>,
Schema,
tokio::sync::oneshot::Sender<()>,
),
std::io::Error,
> {
info!("⛵ Starting Subgraph Service");
trace!("Service Arguments: {:?}", args);
let data_sources = data_sources::DataSources::init(
subgraph_config.service.data_sources.clone(),
&args,
&subgraph_config,
)
.await;
let schema = graphql::schema::ServiceSchema::new(subgraph_config.clone(), data_sources).build();
let graphql_post = warp::path("graphql")
.and(async_graphql_warp::graphql(schema.clone()))
.and(warp::header::headers_cloned())
.and_then(
|(schema, request): (Schema, async_graphql::Request), headers: HeaderMap| async move {
trace!("Request: {:?}", request);
let dynamic_response = schema.execute(request.data(headers)).await;
let response = GraphQLResponse::from(dynamic_response);
Ok::<_, Infallible>(response)
},
);
let graphql_playground = warp::path("playground").and(warp::get()).map(|| {
HttpResponse::builder().body(playground_source(GraphQLPlaygroundConfig::new("/graphql")))
});
let cors = configuration::cors_config::CorsConfig::create_cors(subgraph_config.clone());
let routes =
graphql_playground
.or(graphql_post)
.with(cors)
.recover(|err: Rejection| async move {
if let Some(GraphQLBadRequest(err)) = err.find() {
return Ok::<_, Infallible>(warp::reply::with_status(
err.to_string(),
StatusCode::BAD_REQUEST,
));
}
Ok(warp::reply::with_status(
"INTERNAL_SERVER_ERROR".to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
))
});
let port = match args.port.clone() {
Some(port) => port,
None => match subgraph_config.service.port.clone() {
Some(port) => port,
None => 0,
},
};
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let host = match args.host.clone() {
true => {
let ip = local_ip().expect("Failed to get local IP address");
info!("🛝 Playground: http://{:?}:{:?}", ip, port);
[0, 0, 0, 0]
}
false => match subgraph_config.service.host {
Some(_host) => {
let ip = local_ip().expect("Failed to get local IP address");
info!("🛝 Playground: http://{:?}:{:?}", ip, port);
[0, 0, 0, 0]
}
None => {
info!("🛝 Playground: http://localhost:{:?}", port);
[127, 0, 0, 1]
}
},
};
let (_addr, server) = warp::serve(routes).bind_with_graceful_shutdown((host, port), async {
rx.await.ok();
});
info!("❇️ Subgraph Service Started");
Ok((server, schema, tx))
}