conjure_runtime/blocking/
client.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use crate::blocking::{body, BodyWriter, BodyWriterShim, ResponseBody};
15use crate::{builder, Builder};
16use bytes::Bytes;
17use conjure_error::Error;
18use conjure_http::client::{
19    self, AsyncClient, AsyncRequestBody, BoxAsyncWriteBody, ConjureRuntime, RequestBody,
20};
21use futures::channel::oneshot;
22use futures::executor;
23use http::{Request, Response};
24use once_cell::sync::OnceCell;
25use pin_project::pin_project;
26use std::future::Future;
27use std::io;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31use tokio::runtime::{self, Handle, Runtime};
32use zipkin::TraceContext;
33
34fn default_handle() -> io::Result<&'static Handle> {
35    static RUNTIME: OnceCell<Runtime> = OnceCell::new();
36    RUNTIME
37        .get_or_try_init(|| {
38            runtime::Builder::new_multi_thread()
39                .enable_all()
40                .thread_name("conjure-runtime")
41                .build()
42        })
43        .map(Runtime::handle)
44}
45
46/// A blocking HTTP client to a remote service.
47///
48/// It implements the Conjure `Client` trait, but also offers a "raw" request interface for use with services that don't
49/// provide Conjure service definitions.
50#[derive(Clone)]
51pub struct Client {
52    pub(crate) client: crate::Client,
53    pub(crate) handle: Option<Handle>,
54}
55
56impl Client {
57    /// Returns a new `Builder` for clients.
58    #[inline]
59    pub fn builder() -> Builder<builder::ServiceStage> {
60        Builder::new()
61    }
62}
63
64impl client::Service<Client> for Client {
65    fn new(client: Client, _: &Arc<ConjureRuntime>) -> Self {
66        client
67    }
68}
69
70impl conjure_http::client::Client for Client {
71    type BodyWriter = BodyWriter;
72
73    type ResponseBody = ResponseBody;
74
75    fn send(
76        &self,
77        req: Request<RequestBody<'_, Self::BodyWriter>>,
78    ) -> Result<Response<Self::ResponseBody>, Error> {
79        let mut streamer = None;
80        let req = req.map(|body| match body {
81            RequestBody::Empty => ShimBody::Empty,
82            RequestBody::Fixed(bytes) => ShimBody::Fixed(bytes),
83            RequestBody::Streaming(body_writer) => {
84                let shim = body::shim(body_writer);
85                streamer = Some(shim.1);
86                ShimBody::Streaming(shim.0)
87            }
88        });
89
90        let handle = match &self.handle {
91            Some(handle) => handle,
92            None => default_handle().map_err(Error::internal_safe)?,
93        };
94
95        let (sender, receiver) = oneshot::channel();
96
97        handle.spawn(ContextFuture::new({
98            let client = self.client.clone();
99            async move {
100                let (parts, body) = req.into_parts();
101                let body = match body {
102                    ShimBody::Empty => AsyncRequestBody::Empty,
103                    ShimBody::Fixed(bytes) => AsyncRequestBody::Fixed(bytes),
104                    ShimBody::Streaming(writer) => {
105                        AsyncRequestBody::Streaming(BoxAsyncWriteBody::new(writer))
106                    }
107                };
108                let req = Request::from_parts(parts, body);
109
110                let r = client.send(req).await;
111                let _ = sender.send(r);
112            }
113        }));
114
115        if let Some(streamer) = streamer {
116            streamer.stream();
117        }
118
119        match executor::block_on(receiver) {
120            Ok(Ok(r)) => Ok(r.map(|body| ResponseBody::new(body, handle.clone()))),
121            Ok(Err(e)) => Err(e.with_backtrace()),
122            Err(e) => Err(Error::internal_safe(e)),
123        }
124    }
125}
126
127enum ShimBody {
128    Empty,
129    Fixed(Bytes),
130    Streaming(BodyWriterShim),
131}
132
133#[pin_project]
134struct ContextFuture<F> {
135    #[pin]
136    future: F,
137    context: Option<TraceContext>,
138}
139
140impl<F> ContextFuture<F>
141where
142    F: Future,
143{
144    fn new(future: F) -> ContextFuture<F> {
145        ContextFuture {
146            future,
147            context: zipkin::current(),
148        }
149    }
150}
151
152impl<F> Future for ContextFuture<F>
153where
154    F: Future,
155{
156    type Output = F::Output;
157
158    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
159        let this = self.project();
160        let _guard = this.context.map(zipkin::set_current);
161        this.future.poll(cx)
162    }
163}