pdk_classy/grpc/
client.rs1use std::convert::Infallible;
6use std::rc::Rc;
7use std::time::Duration;
8
9use crate::proxy_wasm::types::Status;
10
11use crate::client::Service;
12use crate::extract::context::ConfigureContext;
13use crate::extract::{extractability, FromContext};
14use crate::host::grpc::GrpcHost;
15
16use crate::http_constants::DEFAULT_TIMEOUT;
17use crate::reactor::root::RootReactor;
18
19use crate::grpc::call::GrpcCall;
20use crate::grpc::error::GrpcCallError;
21use crate::grpc::transport::GrpcCallId;
22
23use super::codec::{BytesCodec, Codec, Encoder};
24use super::GrpcProxyError;
25
26pub struct GrpcClient {
28 reactor: Rc<RootReactor>,
29 grpc_host: Rc<dyn GrpcHost>,
30}
31
32impl FromContext<ConfigureContext, extractability::Transitive> for GrpcClient {
33 type Error = Infallible;
34
35 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
36 Ok(Self {
37 reactor: Rc::clone(&context.root_reactor),
38 grpc_host: Rc::clone(&context.grpc_host),
39 })
40 }
41}
42
43impl GrpcClient {
44 pub fn request<'a>(&'a self, upstream: &'a Service) -> GrpcServiceBuilder<'a> {
46 GrpcServiceBuilder::new(self.reactor.clone(), self.grpc_host.clone(), upstream)
47 }
48}
49
50pub struct GrpcServiceBuilder<'a> {
52 reactor: Rc<RootReactor>,
53 host: Rc<dyn GrpcHost>,
54 upstream: &'a Service,
55}
56
57impl<'a> GrpcServiceBuilder<'a> {
58 pub(super) fn new(
59 reactor: Rc<RootReactor>,
60 host: Rc<dyn GrpcHost>,
61 upstream: &'a Service,
62 ) -> Self {
63 Self {
64 reactor,
65 host,
66 upstream,
67 }
68 }
69
70 pub fn service(self, service: &'a str) -> GrpcMethodBuilder<'a> {
72 GrpcMethodBuilder {
73 reactor: self.reactor,
74 host: self.host,
75 upstream: self.upstream,
76 service,
77 }
78 }
79}
80
81pub struct GrpcMethodBuilder<'a> {
83 reactor: Rc<RootReactor>,
84 host: Rc<dyn GrpcHost>,
85 upstream: &'a Service,
86 service: &'a str,
87}
88
89impl<'a> GrpcMethodBuilder<'a> {
91 pub fn method(self, method: &'a str) -> GrpcCallBuilder<'a, BytesCodec> {
93 GrpcCallBuilder {
94 reactor: self.reactor,
95 host: self.host,
96 upstream: self.upstream,
97 service: self.service,
98 method,
99 initial_metadata: Vec::new(),
100 timeout: DEFAULT_TIMEOUT,
101 codec: BytesCodec,
102 }
103 }
104}
105
106pub struct GrpcCallBuilder<'a, C> {
108 reactor: Rc<RootReactor>,
109 host: Rc<dyn GrpcHost>,
110 upstream: &'a Service,
111 service: &'a str,
112 method: &'a str,
113 initial_metadata: Vec<(&'a str, &'a [u8])>,
114 timeout: Duration,
115 codec: C,
116}
117
118impl<'a, C> GrpcCallBuilder<'a, C>
119where
120 C: Codec,
121{
122 pub fn initial_metadata(self, initial_metadata: Vec<(&'a str, &'a [u8])>) -> Self {
124 Self {
125 initial_metadata,
126 ..self
127 }
128 }
129
130 pub fn add_initial_metadata(mut self, name: &'a str, values: &'a [u8]) -> Self {
132 self.initial_metadata.push((name, values));
133 self
134 }
135
136 pub fn timeout(self, timeout: Duration) -> Self {
138 Self { timeout, ..self }
139 }
140
141 pub fn send(self, message: &<C::Encoder as Encoder>::Input) -> GrpcCall<C::Decoder> {
143 let (mut encoder, decoder) = self.codec.into_components();
144 let upstream = self.upstream.cluster_name();
145 let result = match encoder.encode(message) {
146 Ok(bytes) => self
147 .host
148 .dispatch_grpc_call(
149 upstream,
150 self.service,
151 self.method,
152 self.initial_metadata,
153 Some(bytes.as_ref()),
154 self.timeout,
155 )
156 .map(GrpcCallId::new)
157 .map_err(|s| {
158 if matches!(s, Status::ParseFailure) {
159 GrpcCallError::InvalidUpstream(upstream.to_string())
160 } else {
161 GrpcCallError::Proxy(GrpcProxyError::new(s))
162 }
163 }),
164 Err(e) => Err(GrpcCallError::Encode(e.into())),
165 };
166 GrpcCall::new(self.reactor, self.host, decoder, result)
167 }
168
169 pub(super) fn codec<T: Codec>(self, codec: T) -> GrpcCallBuilder<'a, T> {
170 GrpcCallBuilder {
171 reactor: self.reactor,
172 host: self.host,
173 upstream: self.upstream,
174 service: self.service,
175 method: self.method,
176 initial_metadata: self.initial_metadata,
177 timeout: self.timeout,
178 codec,
179 }
180 }
181}