conjure_runtime/blocking/
client.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use crate::blocking::{body, BodyWriter, BodyWriterShim, ResponseBody};
15use crate::raw::{DefaultRawClient, RawBody, Service};
16use crate::{builder, Builder};
17use bytes::Bytes;
18use conjure_error::Error;
19use conjure_http::client::{self, AsyncClient, AsyncRequestBody, BoxAsyncWriteBody, RequestBody};
20use futures::channel::oneshot;
21use futures::executor;
22use http::{Request, Response};
23use once_cell::sync::OnceCell;
24use pin_project::pin_project;
25use std::future::Future;
26use std::pin::Pin;
27use std::task::{Context, Poll};
28use std::{error, io};
29use tokio::runtime::{self, Handle, Runtime};
30use zipkin::TraceContext;
31
32fn default_handle() -> io::Result<&'static Handle> {
33    static RUNTIME: OnceCell<Runtime> = OnceCell::new();
34    RUNTIME
35        .get_or_try_init(|| {
36            runtime::Builder::new_multi_thread()
37                .enable_all()
38                .thread_name("conjure-runtime")
39                .build()
40        })
41        .map(Runtime::handle)
42}
43
44/// A blocking HTTP client to a remote service.
45///
46/// It implements the Conjure `Client` trait, but also offers a "raw" request interface for use with services that don't
47/// provide Conjure service definitions.
48pub struct Client<T = DefaultRawClient> {
49    pub(crate) client: crate::Client<T>,
50    pub(crate) handle: Option<Handle>,
51}
52
53impl<T> Clone for Client<T> {
54    fn clone(&self) -> Self {
55        Client {
56            client: self.client.clone(),
57            handle: self.handle.clone(),
58        }
59    }
60}
61
62impl Client {
63    /// Returns a new `Builder` for clients.
64    #[inline]
65    pub fn builder() -> Builder<builder::ServiceStage> {
66        Builder::new()
67    }
68}
69
70impl<T> client::Service<Client<T>> for Client<T> {
71    fn new(client: Client<T>) -> Self {
72        client
73    }
74}
75
76impl<T, B> conjure_http::client::Client for Client<T>
77where
78    T: Service<Request<RawBody>, Response = Response<B>> + 'static + Sync + Send,
79    T::Error: Into<Box<dyn error::Error + Sync + Send>>,
80    B: http_body::Body<Data = Bytes> + 'static + Send,
81    B::Error: Into<Box<dyn error::Error + Sync + Send>>,
82{
83    type BodyWriter = BodyWriter;
84
85    type ResponseBody = ResponseBody<B>;
86
87    fn send(
88        &self,
89        req: Request<RequestBody<'_, Self::BodyWriter>>,
90    ) -> Result<Response<Self::ResponseBody>, Error> {
91        let mut streamer = None;
92        let req = req.map(|body| match body {
93            RequestBody::Empty => ShimBody::Empty,
94            RequestBody::Fixed(bytes) => ShimBody::Fixed(bytes),
95            RequestBody::Streaming(body_writer) => {
96                let shim = body::shim(body_writer);
97                streamer = Some(shim.1);
98                ShimBody::Streaming(shim.0)
99            }
100        });
101
102        let handle = match &self.handle {
103            Some(handle) => handle,
104            None => default_handle().map_err(Error::internal_safe)?,
105        };
106
107        let (sender, receiver) = oneshot::channel();
108
109        handle.spawn(ContextFuture::new({
110            let client = self.client.clone();
111            async move {
112                let (parts, body) = req.into_parts();
113                let body = match body {
114                    ShimBody::Empty => AsyncRequestBody::Empty,
115                    ShimBody::Fixed(bytes) => AsyncRequestBody::Fixed(bytes),
116                    ShimBody::Streaming(writer) => {
117                        AsyncRequestBody::Streaming(BoxAsyncWriteBody::new(writer))
118                    }
119                };
120                let req = Request::from_parts(parts, body);
121
122                let r = client.send(req).await;
123                let _ = sender.send(r);
124            }
125        }));
126
127        if let Some(streamer) = streamer {
128            streamer.stream();
129        }
130
131        match executor::block_on(receiver) {
132            Ok(Ok(r)) => Ok(r.map(|body| ResponseBody::new(body, handle.clone()))),
133            Ok(Err(e)) => Err(e.with_backtrace()),
134            Err(e) => Err(Error::internal_safe(e)),
135        }
136    }
137}
138
139enum ShimBody {
140    Empty,
141    Fixed(Bytes),
142    Streaming(BodyWriterShim),
143}
144
145#[pin_project]
146struct ContextFuture<F> {
147    #[pin]
148    future: F,
149    context: Option<TraceContext>,
150}
151
152impl<F> ContextFuture<F>
153where
154    F: Future,
155{
156    fn new(future: F) -> ContextFuture<F> {
157        ContextFuture {
158            future,
159            context: zipkin::current(),
160        }
161    }
162}
163
164impl<F> Future for ContextFuture<F>
165where
166    F: Future,
167{
168    type Output = F::Output;
169
170    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
171        let this = self.project();
172        let _guard = this.context.map(zipkin::set_current);
173        this.future.poll(cx)
174    }
175}