conjure_runtime/blocking/
client.rs1use crate::blocking::{body, BodyWriter, BodyWriterShim, ResponseBody};
15use crate::raw::{DefaultRawClient, RawBody, Service};
16use crate::{builder, Builder};
17use bytes::Bytes;
18use conjure_error::Error;
19use conjure_http::client::{self, AsyncClient, AsyncRequestBody, BoxAsyncWriteBody, RequestBody};
20use futures::channel::oneshot;
21use futures::executor;
22use http::{Request, Response};
23use once_cell::sync::OnceCell;
24use pin_project::pin_project;
25use std::future::Future;
26use std::pin::Pin;
27use std::task::{Context, Poll};
28use std::{error, io};
29use tokio::runtime::{self, Handle, Runtime};
30use zipkin::TraceContext;
31
32fn default_handle() -> io::Result<&'static Handle> {
33 static RUNTIME: OnceCell<Runtime> = OnceCell::new();
34 RUNTIME
35 .get_or_try_init(|| {
36 runtime::Builder::new_multi_thread()
37 .enable_all()
38 .thread_name("conjure-runtime")
39 .build()
40 })
41 .map(Runtime::handle)
42}
43
44pub struct Client<T = DefaultRawClient> {
49 pub(crate) client: crate::Client<T>,
50 pub(crate) handle: Option<Handle>,
51}
52
53impl<T> Clone for Client<T> {
54 fn clone(&self) -> Self {
55 Client {
56 client: self.client.clone(),
57 handle: self.handle.clone(),
58 }
59 }
60}
61
62impl Client {
63 #[inline]
65 pub fn builder() -> Builder<builder::ServiceStage> {
66 Builder::new()
67 }
68}
69
70impl<T> client::Service<Client<T>> for Client<T> {
71 fn new(client: Client<T>) -> Self {
72 client
73 }
74}
75
76impl<T, B> conjure_http::client::Client for Client<T>
77where
78 T: Service<Request<RawBody>, Response = Response<B>> + 'static + Sync + Send,
79 T::Error: Into<Box<dyn error::Error + Sync + Send>>,
80 B: http_body::Body<Data = Bytes> + 'static + Send,
81 B::Error: Into<Box<dyn error::Error + Sync + Send>>,
82{
83 type BodyWriter = BodyWriter;
84
85 type ResponseBody = ResponseBody<B>;
86
87 fn send(
88 &self,
89 req: Request<RequestBody<'_, Self::BodyWriter>>,
90 ) -> Result<Response<Self::ResponseBody>, Error> {
91 let mut streamer = None;
92 let req = req.map(|body| match body {
93 RequestBody::Empty => ShimBody::Empty,
94 RequestBody::Fixed(bytes) => ShimBody::Fixed(bytes),
95 RequestBody::Streaming(body_writer) => {
96 let shim = body::shim(body_writer);
97 streamer = Some(shim.1);
98 ShimBody::Streaming(shim.0)
99 }
100 });
101
102 let handle = match &self.handle {
103 Some(handle) => handle,
104 None => default_handle().map_err(Error::internal_safe)?,
105 };
106
107 let (sender, receiver) = oneshot::channel();
108
109 handle.spawn(ContextFuture::new({
110 let client = self.client.clone();
111 async move {
112 let (parts, body) = req.into_parts();
113 let body = match body {
114 ShimBody::Empty => AsyncRequestBody::Empty,
115 ShimBody::Fixed(bytes) => AsyncRequestBody::Fixed(bytes),
116 ShimBody::Streaming(writer) => {
117 AsyncRequestBody::Streaming(BoxAsyncWriteBody::new(writer))
118 }
119 };
120 let req = Request::from_parts(parts, body);
121
122 let r = client.send(req).await;
123 let _ = sender.send(r);
124 }
125 }));
126
127 if let Some(streamer) = streamer {
128 streamer.stream();
129 }
130
131 match executor::block_on(receiver) {
132 Ok(Ok(r)) => Ok(r.map(|body| ResponseBody::new(body, handle.clone()))),
133 Ok(Err(e)) => Err(e.with_backtrace()),
134 Err(e) => Err(Error::internal_safe(e)),
135 }
136 }
137}
138
139enum ShimBody {
140 Empty,
141 Fixed(Bytes),
142 Streaming(BodyWriterShim),
143}
144
145#[pin_project]
146struct ContextFuture<F> {
147 #[pin]
148 future: F,
149 context: Option<TraceContext>,
150}
151
152impl<F> ContextFuture<F>
153where
154 F: Future,
155{
156 fn new(future: F) -> ContextFuture<F> {
157 ContextFuture {
158 future,
159 context: zipkin::current(),
160 }
161 }
162}
163
164impl<F> Future for ContextFuture<F>
165where
166 F: Future,
167{
168 type Output = F::Output;
169
170 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
171 let this = self.project();
172 let _guard = this.context.map(zipkin::set_current);
173 this.future.poll(cx)
174 }
175}