1#![cfg_attr(any(doc, test), doc = include_str!("../README.md"))]
2#![cfg_attr(not(any(doc, test)), doc = env!("CARGO_PKG_NAME"))]
3#![cfg_attr(docsrs, feature(doc_cfg))]
4#![cfg_attr(
5 test,
6 expect(unused_crate_dependencies, reason = "examples and integration tests")
7)]
8
9mod for_minimal_versions_check_only {
11 use http_body_util as _;
12 #[cfg(test)]
13 use hyper_util as _;
14}
15
16mod response;
17#[cfg(feature = "subscriptions")]
18pub mod subscriptions;
19
20use std::{collections::HashMap, str, sync::Arc};
21
22use derive_more::with_trait::Display;
23use juniper::{
24 ScalarValue,
25 http::{GraphQLBatchRequest, GraphQLRequest},
26};
27use tokio::task;
28use warp::{
29 Filter,
30 body::{self, BodyDeserializeError},
31 http::{self, StatusCode},
32 hyper::body::Bytes,
33 query,
34 reject::{self, Reject, Rejection},
35 reply::{self, Reply},
36};
37
38use self::response::JuniperResponse;
39
40pub fn make_graphql_filter<S, Query, Mutation, Subscription, CtxT, CtxErr>(
164 schema: impl Into<Arc<juniper::RootNode<Query, Mutation, Subscription, S>>>,
165 context_extractor: impl Filter<Extract = (CtxT,), Error = CtxErr> + Send + Sync + 'static,
166) -> impl Filter<Extract = (reply::Response,), Error = Rejection> + Clone + Send
167where
168 Query: juniper::GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
169 Query::TypeInfo: Send + Sync,
170 Mutation: juniper::GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
171 Mutation::TypeInfo: Send + Sync,
172 Subscription: juniper::GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
173 Subscription::TypeInfo: Send + Sync,
174 CtxT: Send + Sync + 'static,
175 CtxErr: Into<Rejection>,
176 S: ScalarValue + Send + Sync + 'static,
177{
178 let schema = schema.into();
179 let context_extractor = context_extractor.boxed();
187
188 get_query_extractor::<S>()
189 .or(post_json_extractor::<S>())
190 .unify()
191 .or(post_graphql_extractor::<S>())
192 .unify()
193 .and(warp::any().map(move || schema.clone()))
194 .and(context_extractor)
195 .then(graphql_handler::<Query, Mutation, Subscription, CtxT, S>)
196 .recover(handle_rejects)
197 .unify()
198}
199
200pub fn make_graphql_filter_sync<S, Query, Mutation, Subscription, CtxT, CtxErr>(
207 schema: impl Into<Arc<juniper::RootNode<Query, Mutation, Subscription, S>>>,
208 context_extractor: impl Filter<Extract = (CtxT,), Error = CtxErr> + Send + Sync + 'static,
209) -> impl Filter<Extract = (reply::Response,), Error = Rejection> + Clone + Send
210where
211 Query: juniper::GraphQLType<S, Context = CtxT> + Send + Sync + 'static,
212 Query::TypeInfo: Send + Sync,
213 Mutation: juniper::GraphQLType<S, Context = CtxT> + Send + Sync + 'static,
214 Mutation::TypeInfo: Send + Sync,
215 Subscription: juniper::GraphQLType<S, Context = CtxT> + Send + Sync + 'static,
216 Subscription::TypeInfo: Send + Sync,
217 CtxT: Send + Sync + 'static,
218 CtxErr: Into<Rejection>,
219 S: ScalarValue + Send + Sync + 'static,
220{
221 let schema = schema.into();
222 let context_extractor = context_extractor.boxed();
230
231 get_query_extractor::<S>()
232 .or(post_json_extractor::<S>())
233 .unify()
234 .or(post_graphql_extractor::<S>())
235 .unify()
236 .and(warp::any().map(move || schema.clone()))
237 .and(context_extractor)
238 .then(graphql_handler_sync::<Query, Mutation, Subscription, CtxT, S>)
239 .recover(handle_rejects)
240 .unify()
241}
242
243async fn graphql_handler<Query, Mutation, Subscription, CtxT, S>(
246 req: GraphQLBatchRequest<S>,
247 schema: Arc<juniper::RootNode<Query, Mutation, Subscription, S>>,
248 context: CtxT,
249) -> reply::Response
250where
251 Query: juniper::GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
252 Query::TypeInfo: Send + Sync,
253 Mutation: juniper::GraphQLTypeAsync<S, Context = CtxT> + Send + 'static,
254 Mutation::TypeInfo: Send + Sync,
255 Subscription: juniper::GraphQLSubscriptionType<S, Context = CtxT> + Send + 'static,
256 Subscription::TypeInfo: Send + Sync,
257 CtxT: Send + Sync + 'static,
258 S: ScalarValue + Send + Sync + 'static,
259{
260 let resp = req.execute(&*schema, &context).await;
261 JuniperResponse(resp).into_response()
262}
263
264async fn graphql_handler_sync<Query, Mutation, Subscription, CtxT, S>(
268 req: GraphQLBatchRequest<S>,
269 schema: Arc<juniper::RootNode<Query, Mutation, Subscription, S>>,
270 context: CtxT,
271) -> reply::Response
272where
273 Query: juniper::GraphQLType<S, Context = CtxT> + Send + Sync + 'static,
274 Query::TypeInfo: Send + Sync,
275 Mutation: juniper::GraphQLType<S, Context = CtxT> + Send + Sync + 'static,
276 Mutation::TypeInfo: Send + Sync,
277 Subscription: juniper::GraphQLType<S, Context = CtxT> + Send + Sync + 'static,
278 Subscription::TypeInfo: Send + Sync,
279 CtxT: Send + Sync + 'static,
280 S: ScalarValue + Send + Sync + 'static,
281{
282 task::spawn_blocking(move || req.execute_sync(&*schema, &context))
283 .await
284 .map(|resp| JuniperResponse(resp).into_response())
285 .unwrap_or_else(|e| BlockingError(e).into_response())
286}
287
288fn post_json_extractor<S>()
290-> impl Filter<Extract = (GraphQLBatchRequest<S>,), Error = Rejection> + Clone + Send
291where
292 S: ScalarValue + Send,
293{
294 warp::post().and(body::json())
295}
296
297fn post_graphql_extractor<S>()
299-> impl Filter<Extract = (GraphQLBatchRequest<S>,), Error = Rejection> + Clone + Send
300where
301 S: ScalarValue + Send,
302{
303 warp::post()
304 .and(body::bytes())
305 .and_then(async |body: Bytes| {
306 let query = str::from_utf8(body.as_ref())
307 .map_err(|e| reject::custom(FilterError::NonUtf8Body(e)))?;
308 let req = GraphQLRequest::new(query.into(), None, None);
309 Ok::<GraphQLBatchRequest<S>, Rejection>(GraphQLBatchRequest::Single(req))
310 })
311}
312
313fn get_query_extractor<S>()
315-> impl Filter<Extract = (GraphQLBatchRequest<S>,), Error = Rejection> + Clone + Send
316where
317 S: ScalarValue + Send,
318{
319 warp::get()
320 .and(query::query())
321 .and_then(async |mut qry: HashMap<String, String>| {
322 let req = GraphQLRequest::new(
323 qry.remove("query")
324 .ok_or_else(|| reject::custom(FilterError::MissingPathQuery))?,
325 qry.remove("operation_name"),
326 qry.remove("variables")
327 .map(|vs| serde_json::from_str(&vs))
328 .transpose()
329 .map_err(|e| reject::custom(FilterError::InvalidPathVariables(e)))?,
330 );
331 Ok::<GraphQLBatchRequest<S>, Rejection>(GraphQLBatchRequest::Single(req))
332 })
333}
334
335async fn handle_rejects(rej: Rejection) -> Result<reply::Response, Rejection> {
337 let (status, msg) = if let Some(e) = rej.find::<FilterError>() {
338 (StatusCode::BAD_REQUEST, e.to_string())
339 } else if let Some(e) = rej.find::<warp::reject::InvalidQuery>() {
340 (StatusCode::BAD_REQUEST, e.to_string())
341 } else if let Some(e) = rej.find::<BodyDeserializeError>() {
342 (StatusCode::BAD_REQUEST, e.to_string())
343 } else {
344 return Err(rej);
345 };
346
347 Ok(http::Response::builder()
348 .status(status)
349 .body(msg.into())
350 .unwrap())
351}
352
353#[derive(Debug, Display)]
355enum FilterError {
356 #[display("Missing GraphQL `query` string in query parameters")]
358 MissingPathQuery,
359
360 #[display("Failed to deserialize GraphQL `variables` from JSON: {_0}")]
362 InvalidPathVariables(serde_json::Error),
363
364 #[display("Request body is not a valid UTF-8 string: {_0}")]
366 NonUtf8Body(str::Utf8Error),
367}
368
369impl Reject for FilterError {}
370
371#[derive(Debug)]
373struct BlockingError(task::JoinError);
374
375impl Reply for BlockingError {
376 fn into_response(self) -> reply::Response {
377 http::Response::builder()
378 .status(StatusCode::INTERNAL_SERVER_ERROR)
379 .body(format!("Failed to execute synchronous GraphQL request: {}", self.0).into())
380 .unwrap_or_else(|e| {
381 unreachable!("cannot build `reply::Response` out of `BlockingError`: {e}")
382 })
383 }
384}
385
386pub fn graphiql_filter(
408 graphql_endpoint_url: &'static str,
409 subscriptions_endpoint: Option<&'static str>,
410) -> warp::filters::BoxedFilter<(http::Response<Vec<u8>>,)> {
411 warp::any()
412 .map(move || graphiql_response(graphql_endpoint_url, subscriptions_endpoint))
413 .boxed()
414}
415
416fn graphiql_response(
417 graphql_endpoint_url: &'static str,
418 subscriptions_endpoint: Option<&'static str>,
419) -> http::Response<Vec<u8>> {
420 http::Response::builder()
421 .header("content-type", "text/html;charset=utf-8")
422 .body(
423 juniper::http::graphiql::graphiql_source(graphql_endpoint_url, subscriptions_endpoint)
424 .into_bytes(),
425 )
426 .expect("response is valid")
427}
428
429pub fn playground_filter(
431 graphql_endpoint_url: &'static str,
432 subscriptions_endpoint_url: Option<&'static str>,
433) -> warp::filters::BoxedFilter<(http::Response<Vec<u8>>,)> {
434 warp::any()
435 .map(move || playground_response(graphql_endpoint_url, subscriptions_endpoint_url))
436 .boxed()
437}
438
439fn playground_response(
440 graphql_endpoint_url: &'static str,
441 subscriptions_endpoint_url: Option<&'static str>,
442) -> http::Response<Vec<u8>> {
443 http::Response::builder()
444 .header("content-type", "text/html;charset=utf-8")
445 .body(
446 juniper::http::playground::playground_source(
447 graphql_endpoint_url,
448 subscriptions_endpoint_url,
449 )
450 .into_bytes(),
451 )
452 .expect("response is valid")
453}
454
455#[cfg(test)]
456mod tests {
457 mod make_graphql_filter {
458 use std::future;
459
460 use juniper::{
461 EmptyMutation, EmptySubscription,
462 http::GraphQLBatchRequest,
463 tests::fixtures::starwars::schema::{Database, Query},
464 };
465 use warp::{
466 Filter as _, Reply, http,
467 reject::{self, Reject},
468 test::request,
469 };
470
471 use super::super::make_graphql_filter;
472
473 #[tokio::test]
474 async fn post_json() {
475 type Schema =
476 juniper::RootNode<Query, EmptyMutation<Database>, EmptySubscription<Database>>;
477
478 let schema = Schema::new(Query, EmptyMutation::new(), EmptySubscription::new());
479
480 let db = warp::any().map(Database::new);
481 let filter = warp::path("graphql2").and(make_graphql_filter(schema, db));
482
483 let response = request()
484 .method("POST")
485 .path("/graphql2")
486 .header("accept", "application/json")
487 .header("content-type", "application/json")
488 .body(r#"{"variables": null, "query": "{ hero(episode: NEW_HOPE) { name } }"}"#)
489 .reply(&filter)
490 .await;
491
492 assert_eq!(response.status(), http::StatusCode::OK);
493 assert_eq!(
494 response.headers().get("content-type").unwrap(),
495 "application/json",
496 );
497 assert_eq!(
498 String::from_utf8(response.body().to_vec()).unwrap(),
499 r#"{"data":{"hero":{"name":"R2-D2"}}}"#,
500 );
501 }
502
503 #[tokio::test]
504 async fn rejects_fast_when_context_extractor_fails() {
505 use std::sync::{
506 Arc,
507 atomic::{AtomicBool, Ordering},
508 };
509
510 #[derive(Clone, Copy, Debug)]
511 struct ExtractionError;
512
513 impl Reject for ExtractionError {}
514
515 impl warp::Reply for ExtractionError {
516 fn into_response(self) -> warp::reply::Response {
517 http::StatusCode::IM_A_TEAPOT.into_response()
518 }
519 }
520
521 type Schema =
522 juniper::RootNode<Query, EmptyMutation<Database>, EmptySubscription<Database>>;
523
524 let schema = Schema::new(Query, EmptyMutation::new(), EmptySubscription::new());
525
526 let is_called = Arc::new(AtomicBool::new(false));
530 let context_extractor = warp::any().and_then(move || {
531 future::ready(if is_called.swap(true, Ordering::Relaxed) {
532 Ok(Database::new())
533 } else {
534 Err(reject::custom(ExtractionError))
535 })
536 });
537
538 let filter = warp::path("graphql")
539 .and(make_graphql_filter(schema, context_extractor))
540 .recover(async |rejection: warp::reject::Rejection| {
541 rejection
542 .find::<ExtractionError>()
543 .map(|e| e.into_response())
544 .ok_or(rejection)
545 });
546
547 let resp = request()
548 .method("POST")
549 .path("/graphql")
550 .header("accept", "application/json")
551 .header("content-type", "application/json")
552 .body(r#"{"variables": null, "query": "{ hero(episode: NEW_HOPE) { name } }"}"#)
553 .reply(&filter)
554 .await;
555
556 assert_eq!(
557 resp.status(),
558 http::StatusCode::IM_A_TEAPOT,
559 "response: {resp:#?}",
560 );
561 }
562
563 #[tokio::test]
564 async fn batch_requests() {
565 type Schema =
566 juniper::RootNode<Query, EmptyMutation<Database>, EmptySubscription<Database>>;
567
568 let schema = Schema::new(Query, EmptyMutation::new(), EmptySubscription::new());
569
570 let db = warp::any().map(Database::new);
571 let filter = warp::path("graphql2").and(make_graphql_filter(schema, db));
572
573 let response = request()
574 .method("POST")
575 .path("/graphql2")
576 .header("accept", "application/json")
577 .header("content-type", "application/json")
578 .body(
579 r#"[
580 {"variables": null, "query": "{ hero(episode: NEW_HOPE) { name } }"},
581 {"variables": null, "query": "{ hero(episode: EMPIRE) { id name } }"}
582 ]"#,
583 )
584 .reply(&filter)
585 .await;
586
587 assert_eq!(response.status(), http::StatusCode::OK);
588 assert_eq!(
589 String::from_utf8(response.body().to_vec()).unwrap(),
590 r#"[{"data":{"hero":{"name":"R2-D2"}}},{"data":{"hero":{"id":"1000","name":"Luke Skywalker"}}}]"#,
591 );
592 assert_eq!(
593 response.headers().get("content-type").unwrap(),
594 "application/json",
595 );
596 }
597
598 #[test]
599 fn batch_request_deserialization_can_fail() {
600 let json = r#"blah"#;
601 let result: Result<GraphQLBatchRequest, _> = serde_json::from_str(json);
602
603 assert!(result.is_err());
604 }
605 }
606
607 mod graphiql_filter {
608 use warp::{Filter as _, http, test::request};
609
610 use super::super::{graphiql_filter, graphiql_response};
611
612 #[test]
613 fn response_does_not_panic() {
614 graphiql_response("/abcd", None);
615 }
616
617 #[tokio::test]
618 async fn endpoint_matches() {
619 let filter = warp::get()
620 .and(warp::path("graphiql"))
621 .and(graphiql_filter("/graphql", None));
622 let result = request()
623 .method("GET")
624 .path("/graphiql")
625 .header("accept", "text/html")
626 .filter(&filter)
627 .await;
628
629 assert!(result.is_ok());
630 }
631
632 #[tokio::test]
633 async fn returns_graphiql_source() {
634 let filter = warp::get()
635 .and(warp::path("dogs-api"))
636 .and(warp::path("graphiql"))
637 .and(graphiql_filter("/dogs-api/graphql", None));
638 let response = request()
639 .method("GET")
640 .path("/dogs-api/graphiql")
641 .header("accept", "text/html")
642 .reply(&filter)
643 .await;
644
645 assert_eq!(response.status(), http::StatusCode::OK);
646 assert_eq!(
647 response.headers().get("content-type").unwrap(),
648 "text/html;charset=utf-8"
649 );
650 let body = String::from_utf8(response.body().to_vec()).unwrap();
651
652 assert!(body.contains("const JUNIPER_URL = '/dogs-api/graphql';"));
653 }
654
655 #[tokio::test]
656 async fn endpoint_with_subscription_matches() {
657 let filter = warp::get().and(warp::path("graphiql")).and(graphiql_filter(
658 "/graphql",
659 Some("ws:://localhost:8080/subscriptions"),
660 ));
661 let result = request()
662 .method("GET")
663 .path("/graphiql")
664 .header("accept", "text/html")
665 .filter(&filter)
666 .await;
667
668 assert!(result.is_ok());
669 }
670 }
671
672 mod playground_filter {
673 use warp::{Filter as _, http, test::request};
674
675 use super::super::playground_filter;
676
677 #[tokio::test]
678 async fn endpoint_matches() {
679 let filter = warp::get()
680 .and(warp::path("playground"))
681 .and(playground_filter("/graphql", Some("/subscripitons")));
682
683 let result = request()
684 .method("GET")
685 .path("/playground")
686 .header("accept", "text/html")
687 .filter(&filter)
688 .await;
689
690 assert!(result.is_ok());
691 }
692
693 #[tokio::test]
694 async fn returns_playground_source() {
695 let filter = warp::get()
696 .and(warp::path("dogs-api"))
697 .and(warp::path("playground"))
698 .and(playground_filter(
699 "/dogs-api/graphql",
700 Some("/dogs-api/subscriptions"),
701 ));
702 let response = request()
703 .method("GET")
704 .path("/dogs-api/playground")
705 .header("accept", "text/html")
706 .reply(&filter)
707 .await;
708
709 assert_eq!(response.status(), http::StatusCode::OK);
710 assert_eq!(
711 response.headers().get("content-type").unwrap(),
712 "text/html;charset=utf-8"
713 );
714
715 let body = String::from_utf8(response.body().to_vec()).unwrap();
716
717 assert!(body.contains(
718 "endpoint: '/dogs-api/graphql', subscriptionEndpoint: '/dogs-api/subscriptions'",
719 ));
720 }
721 }
722}