conjure_runtime/blocking/
client.rs1use crate::blocking::{body, BodyWriter, BodyWriterShim, ResponseBody};
15use crate::{builder, Builder};
16use bytes::Bytes;
17use conjure_error::Error;
18use conjure_http::client::{
19 self, AsyncClient, AsyncRequestBody, BoxAsyncWriteBody, ConjureRuntime, RequestBody,
20};
21use futures::channel::oneshot;
22use futures::executor;
23use http::{Request, Response};
24use once_cell::sync::OnceCell;
25use pin_project::pin_project;
26use std::future::Future;
27use std::io;
28use std::pin::Pin;
29use std::sync::Arc;
30use std::task::{Context, Poll};
31use tokio::runtime::{self, Handle, Runtime};
32use zipkin::TraceContext;
33
34fn default_handle() -> io::Result<&'static Handle> {
35 static RUNTIME: OnceCell<Runtime> = OnceCell::new();
36 RUNTIME
37 .get_or_try_init(|| {
38 runtime::Builder::new_multi_thread()
39 .enable_all()
40 .thread_name("conjure-runtime")
41 .build()
42 })
43 .map(Runtime::handle)
44}
45
46#[derive(Clone)]
51pub struct Client {
52 pub(crate) client: crate::Client,
53 pub(crate) handle: Option<Handle>,
54}
55
56impl Client {
57 #[inline]
59 pub fn builder() -> Builder<builder::ServiceStage> {
60 Builder::new()
61 }
62}
63
64impl client::Service<Client> for Client {
65 fn new(client: Client, _: &Arc<ConjureRuntime>) -> Self {
66 client
67 }
68}
69
70impl conjure_http::client::Client for Client {
71 type BodyWriter = BodyWriter;
72
73 type ResponseBody = ResponseBody;
74
75 fn send(
76 &self,
77 req: Request<RequestBody<'_, Self::BodyWriter>>,
78 ) -> Result<Response<Self::ResponseBody>, Error> {
79 let mut streamer = None;
80 let req = req.map(|body| match body {
81 RequestBody::Empty => ShimBody::Empty,
82 RequestBody::Fixed(bytes) => ShimBody::Fixed(bytes),
83 RequestBody::Streaming(body_writer) => {
84 let shim = body::shim(body_writer);
85 streamer = Some(shim.1);
86 ShimBody::Streaming(shim.0)
87 }
88 });
89
90 let handle = match &self.handle {
91 Some(handle) => handle,
92 None => default_handle().map_err(Error::internal_safe)?,
93 };
94
95 let (sender, receiver) = oneshot::channel();
96
97 handle.spawn(ContextFuture::new({
98 let client = self.client.clone();
99 async move {
100 let (parts, body) = req.into_parts();
101 let body = match body {
102 ShimBody::Empty => AsyncRequestBody::Empty,
103 ShimBody::Fixed(bytes) => AsyncRequestBody::Fixed(bytes),
104 ShimBody::Streaming(writer) => {
105 AsyncRequestBody::Streaming(BoxAsyncWriteBody::new(writer))
106 }
107 };
108 let req = Request::from_parts(parts, body);
109
110 let r = client.send(req).await;
111 let _ = sender.send(r);
112 }
113 }));
114
115 if let Some(streamer) = streamer {
116 streamer.stream();
117 }
118
119 match executor::block_on(receiver) {
120 Ok(Ok(r)) => Ok(r.map(|body| ResponseBody::new(body, handle.clone()))),
121 Ok(Err(e)) => Err(e.with_backtrace()),
122 Err(e) => Err(Error::internal_safe(e)),
123 }
124 }
125}
126
127enum ShimBody {
128 Empty,
129 Fixed(Bytes),
130 Streaming(BodyWriterShim),
131}
132
133#[pin_project]
134struct ContextFuture<F> {
135 #[pin]
136 future: F,
137 context: Option<TraceContext>,
138}
139
140impl<F> ContextFuture<F>
141where
142 F: Future,
143{
144 fn new(future: F) -> ContextFuture<F> {
145 ContextFuture {
146 future,
147 context: zipkin::current(),
148 }
149 }
150}
151
152impl<F> Future for ContextFuture<F>
153where
154 F: Future,
155{
156 type Output = F::Output;
157
158 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
159 let this = self.project();
160 let _guard = this.context.map(zipkin::set_current);
161 this.future.poll(cx)
162 }
163}