finchers_juniper/execute/
with_spawner.rs

1use finchers::endpoint;
2use finchers::endpoint::wrapper::Wrapper;
3use finchers::endpoint::{ApplyContext, ApplyResult, Endpoint, IntoEndpoint};
4use finchers::error::Error;
5
6use futures::future;
7use futures::future::Executor;
8use futures::sync::oneshot;
9use futures::{Future, Poll};
10use std::sync::Arc;
11
12use super::shared::SharedSchema;
13use request::{GraphQLRequest, GraphQLRequestEndpoint, GraphQLResponse, RequestFuture};
14
15/// Create a GraphQL executor from the specified `RootNode` and task executor.
16///
17/// The endpoint created by this wrapper will spawn a task which executes the GraphQL queries
18/// after receiving the request, by using the specified `Executor<T>`.
19pub fn with_spawner<S, Sp>(schema: S, spawner: Sp) -> WithSpawner<S, Sp>
20where
21    S: SharedSchema,
22    Sp: Executor<oneshot::Execute<GraphQLTask<S>>>,
23{
24    WithSpawner { schema, spawner }
25}
26
27#[allow(missing_docs)]
28#[derive(Debug)]
29pub struct WithSpawner<S, Sp> {
30    schema: S,
31    spawner: Sp,
32}
33
34impl<'a, S, Sp> IntoEndpoint<'a> for WithSpawner<S, Sp>
35where
36    S: SharedSchema<Context = ()>,
37    Sp: Executor<oneshot::Execute<GraphQLTask<S>>> + 'a,
38{
39    type Output = (GraphQLResponse,);
40    type Endpoint = WithSpawnerEndpoint<endpoint::Cloned<()>, S, Sp>;
41
42    fn into_endpoint(self) -> Self::Endpoint {
43        WithSpawnerEndpoint {
44            context: endpoint::cloned(()),
45            request: ::request::graphql_request(),
46            schema: Arc::new(self.schema),
47            spawner: self.spawner,
48        }
49    }
50}
51
52impl<'a, E, S, Sp> Wrapper<'a, E> for WithSpawner<S, Sp>
53where
54    E: Endpoint<'a, Output = (S::Context,)>,
55    S: SharedSchema,
56    Sp: Executor<oneshot::Execute<GraphQLTask<S>>> + 'a,
57{
58    type Output = (GraphQLResponse,);
59    type Endpoint = WithSpawnerEndpoint<E, S, Sp>;
60
61    fn wrap(self, endpoint: E) -> Self::Endpoint {
62        WithSpawnerEndpoint {
63            context: endpoint,
64            request: ::request::graphql_request(),
65            schema: Arc::new(self.schema),
66            spawner: self.spawner,
67        }
68    }
69}
70
71#[derive(Debug)]
72pub struct WithSpawnerEndpoint<E, S, Sp> {
73    context: E,
74    request: GraphQLRequestEndpoint,
75    schema: Arc<S>,
76    spawner: Sp,
77}
78
79impl<'a, E, S, Sp> Endpoint<'a> for WithSpawnerEndpoint<E, S, Sp>
80where
81    E: Endpoint<'a, Output = (S::Context,)>,
82    S: SharedSchema,
83    Sp: Executor<oneshot::Execute<GraphQLTask<S>>> + 'a,
84{
85    type Output = (GraphQLResponse,);
86    type Future = WithSpawnerFuture<'a, E, S, Sp>;
87
88    fn apply(&'a self, cx: &mut ApplyContext<'_>) -> ApplyResult<Self::Future> {
89        let context = self.context.apply(cx)?;
90        let request = self.request.apply(cx)?;
91        Ok(WithSpawnerFuture {
92            inner: context.join(request),
93            handle: None,
94            endpoint: self,
95        })
96    }
97}
98
99#[allow(missing_debug_implementations)]
100pub struct WithSpawnerFuture<'a, E: Endpoint<'a>, S: 'a, Sp: 'a> {
101    inner: future::Join<E::Future, RequestFuture<'a>>,
102    handle: Option<oneshot::SpawnHandle<GraphQLResponse, ()>>,
103    endpoint: &'a WithSpawnerEndpoint<E, S, Sp>,
104}
105
106impl<'a, E, S, Sp> Future for WithSpawnerFuture<'a, E, S, Sp>
107where
108    E: Endpoint<'a, Output = (S::Context,)>,
109    S: SharedSchema,
110    Sp: Executor<oneshot::Execute<GraphQLTask<S>>>,
111{
112    type Item = (GraphQLResponse,);
113    type Error = Error;
114
115    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
116        loop {
117            match self.handle {
118                Some(ref mut handle) => {
119                    return handle
120                        .poll()
121                        .map(|x| x.map(|response| (response,)))
122                        .map_err(|_| unreachable!());
123                }
124                None => {
125                    let ((context,), (request,)) = try_ready!(self.inner.poll());
126
127                    trace!("spawn a GraphQL task with the specified task executor");
128                    let schema = self.endpoint.schema.clone();
129                    let future = GraphQLTask {
130                        request,
131                        schema,
132                        context,
133                    };
134                    let handle = oneshot::spawn(future, &self.endpoint.spawner);
135                    self.handle = Some(handle);
136                }
137            }
138        }
139    }
140}
141
142// not a public API.
143#[allow(missing_debug_implementations)]
144pub struct GraphQLTask<S: SharedSchema> {
145    request: GraphQLRequest,
146    schema: Arc<S>,
147    context: S::Context,
148}
149
150impl<S: SharedSchema> Future for GraphQLTask<S> {
151    type Item = GraphQLResponse;
152    type Error = ();
153
154    #[inline]
155    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
156        Ok(self
157            .request
158            .execute(self.schema.as_root_node(), &self.context)
159            .into())
160    }
161}