finchers_juniper/execute/
with_spawner.rs1use finchers::endpoint;
2use finchers::endpoint::wrapper::Wrapper;
3use finchers::endpoint::{ApplyContext, ApplyResult, Endpoint, IntoEndpoint};
4use finchers::error::Error;
5
6use futures::future;
7use futures::future::Executor;
8use futures::sync::oneshot;
9use futures::{Future, Poll};
10use std::sync::Arc;
11
12use super::shared::SharedSchema;
13use request::{GraphQLRequest, GraphQLRequestEndpoint, GraphQLResponse, RequestFuture};
14
15pub fn with_spawner<S, Sp>(schema: S, spawner: Sp) -> WithSpawner<S, Sp>
20where
21 S: SharedSchema,
22 Sp: Executor<oneshot::Execute<GraphQLTask<S>>>,
23{
24 WithSpawner { schema, spawner }
25}
26
27#[allow(missing_docs)]
28#[derive(Debug)]
29pub struct WithSpawner<S, Sp> {
30 schema: S,
31 spawner: Sp,
32}
33
34impl<'a, S, Sp> IntoEndpoint<'a> for WithSpawner<S, Sp>
35where
36 S: SharedSchema<Context = ()>,
37 Sp: Executor<oneshot::Execute<GraphQLTask<S>>> + 'a,
38{
39 type Output = (GraphQLResponse,);
40 type Endpoint = WithSpawnerEndpoint<endpoint::Cloned<()>, S, Sp>;
41
42 fn into_endpoint(self) -> Self::Endpoint {
43 WithSpawnerEndpoint {
44 context: endpoint::cloned(()),
45 request: ::request::graphql_request(),
46 schema: Arc::new(self.schema),
47 spawner: self.spawner,
48 }
49 }
50}
51
52impl<'a, E, S, Sp> Wrapper<'a, E> for WithSpawner<S, Sp>
53where
54 E: Endpoint<'a, Output = (S::Context,)>,
55 S: SharedSchema,
56 Sp: Executor<oneshot::Execute<GraphQLTask<S>>> + 'a,
57{
58 type Output = (GraphQLResponse,);
59 type Endpoint = WithSpawnerEndpoint<E, S, Sp>;
60
61 fn wrap(self, endpoint: E) -> Self::Endpoint {
62 WithSpawnerEndpoint {
63 context: endpoint,
64 request: ::request::graphql_request(),
65 schema: Arc::new(self.schema),
66 spawner: self.spawner,
67 }
68 }
69}
70
71#[derive(Debug)]
72pub struct WithSpawnerEndpoint<E, S, Sp> {
73 context: E,
74 request: GraphQLRequestEndpoint,
75 schema: Arc<S>,
76 spawner: Sp,
77}
78
79impl<'a, E, S, Sp> Endpoint<'a> for WithSpawnerEndpoint<E, S, Sp>
80where
81 E: Endpoint<'a, Output = (S::Context,)>,
82 S: SharedSchema,
83 Sp: Executor<oneshot::Execute<GraphQLTask<S>>> + 'a,
84{
85 type Output = (GraphQLResponse,);
86 type Future = WithSpawnerFuture<'a, E, S, Sp>;
87
88 fn apply(&'a self, cx: &mut ApplyContext<'_>) -> ApplyResult<Self::Future> {
89 let context = self.context.apply(cx)?;
90 let request = self.request.apply(cx)?;
91 Ok(WithSpawnerFuture {
92 inner: context.join(request),
93 handle: None,
94 endpoint: self,
95 })
96 }
97}
98
99#[allow(missing_debug_implementations)]
100pub struct WithSpawnerFuture<'a, E: Endpoint<'a>, S: 'a, Sp: 'a> {
101 inner: future::Join<E::Future, RequestFuture<'a>>,
102 handle: Option<oneshot::SpawnHandle<GraphQLResponse, ()>>,
103 endpoint: &'a WithSpawnerEndpoint<E, S, Sp>,
104}
105
106impl<'a, E, S, Sp> Future for WithSpawnerFuture<'a, E, S, Sp>
107where
108 E: Endpoint<'a, Output = (S::Context,)>,
109 S: SharedSchema,
110 Sp: Executor<oneshot::Execute<GraphQLTask<S>>>,
111{
112 type Item = (GraphQLResponse,);
113 type Error = Error;
114
115 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
116 loop {
117 match self.handle {
118 Some(ref mut handle) => {
119 return handle
120 .poll()
121 .map(|x| x.map(|response| (response,)))
122 .map_err(|_| unreachable!());
123 }
124 None => {
125 let ((context,), (request,)) = try_ready!(self.inner.poll());
126
127 trace!("spawn a GraphQL task with the specified task executor");
128 let schema = self.endpoint.schema.clone();
129 let future = GraphQLTask {
130 request,
131 schema,
132 context,
133 };
134 let handle = oneshot::spawn(future, &self.endpoint.spawner);
135 self.handle = Some(handle);
136 }
137 }
138 }
139 }
140}
141
142#[allow(missing_debug_implementations)]
144pub struct GraphQLTask<S: SharedSchema> {
145 request: GraphQLRequest,
146 schema: Arc<S>,
147 context: S::Context,
148}
149
150impl<S: SharedSchema> Future for GraphQLTask<S> {
151 type Item = GraphQLResponse;
152 type Error = ();
153
154 #[inline]
155 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
156 Ok(self
157 .request
158 .execute(self.schema.as_root_node(), &self.context)
159 .into())
160 }
161}