pdk_classy/grpc/
client.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5use std::convert::Infallible;
6use std::rc::Rc;
7use std::time::Duration;
8
9use crate::proxy_wasm::types::Status;
10
11use crate::client::Service;
12use crate::extract::context::ConfigureContext;
13use crate::extract::{extractability, FromContext};
14use crate::host::grpc::GrpcHost;
15
16use crate::http_constants::DEFAULT_TIMEOUT;
17use crate::reactor::root::RootReactor;
18
19use crate::grpc::call::GrpcCall;
20use crate::grpc::error::GrpcCallError;
21use crate::grpc::transport::GrpcCallId;
22
23use super::codec::{BytesCodec, Codec, Encoder};
24use super::GrpcProxyError;
25
26/// An asynchronous gRPC client to make calls with.
27pub struct GrpcClient {
28    reactor: Rc<RootReactor>,
29    grpc_host: Rc<dyn GrpcHost>,
30}
31
32impl FromContext<ConfigureContext, extractability::Transitive> for GrpcClient {
33    type Error = Infallible;
34
35    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
36        Ok(Self {
37            reactor: Rc::clone(&context.root_reactor),
38            grpc_host: Rc::clone(&context.grpc_host),
39        })
40    }
41}
42
43impl GrpcClient {
44    /// Creates a gRPC request that will forward the whole response to the caller.
45    pub fn request<'a>(&'a self, upstream: &'a Service) -> GrpcServiceBuilder<'a> {
46        GrpcServiceBuilder::new(self.reactor.clone(), self.grpc_host.clone(), upstream)
47    }
48}
49
50/// A gRPC request builder for gRPC services.
51pub struct GrpcServiceBuilder<'a> {
52    reactor: Rc<RootReactor>,
53    host: Rc<dyn GrpcHost>,
54    upstream: &'a Service,
55}
56
57impl<'a> GrpcServiceBuilder<'a> {
58    pub(super) fn new(
59        reactor: Rc<RootReactor>,
60        host: Rc<dyn GrpcHost>,
61        upstream: &'a Service,
62    ) -> Self {
63        Self {
64            reactor,
65            host,
66            upstream,
67        }
68    }
69
70    /// Sets the gRPC service.
71    pub fn service(self, service: &'a str) -> GrpcMethodBuilder<'a> {
72        GrpcMethodBuilder {
73            reactor: self.reactor,
74            host: self.host,
75            upstream: self.upstream,
76            service,
77        }
78    }
79}
80
81/// A gRPC request builder for gRPC methods.
82pub struct GrpcMethodBuilder<'a> {
83    reactor: Rc<RootReactor>,
84    host: Rc<dyn GrpcHost>,
85    upstream: &'a Service,
86    service: &'a str,
87}
88
89/// A gRPC request builder for gRPC methods.
90impl<'a> GrpcMethodBuilder<'a> {
91    /// Sets the gRPC method.
92    pub fn method(self, method: &'a str) -> GrpcCallBuilder<'a, BytesCodec> {
93        GrpcCallBuilder {
94            reactor: self.reactor,
95            host: self.host,
96            upstream: self.upstream,
97            service: self.service,
98            method,
99            initial_metadata: Vec::new(),
100            timeout: DEFAULT_TIMEOUT,
101            codec: BytesCodec,
102        }
103    }
104}
105
106/// A gRPC call builder.
107pub struct GrpcCallBuilder<'a, C> {
108    reactor: Rc<RootReactor>,
109    host: Rc<dyn GrpcHost>,
110    upstream: &'a Service,
111    service: &'a str,
112    method: &'a str,
113    initial_metadata: Vec<(&'a str, &'a [u8])>,
114    timeout: Duration,
115    codec: C,
116}
117
118impl<'a, C> GrpcCallBuilder<'a, C>
119where
120    C: Codec,
121{
122    /// Sets the initial metadata.
123    pub fn initial_metadata(self, initial_metadata: Vec<(&'a str, &'a [u8])>) -> Self {
124        Self {
125            initial_metadata,
126            ..self
127        }
128    }
129
130    /// Adds initial metadata.
131    pub fn add_initial_metadata(mut self, name: &'a str, values: &'a [u8]) -> Self {
132        self.initial_metadata.push((name, values));
133        self
134    }
135
136    /// Sets the request timeout.
137    pub fn timeout(self, timeout: Duration) -> Self {
138        Self { timeout, ..self }
139    }
140
141    /// Sends a oneshot gRPC request to the upstream.
142    pub fn send(self, message: &<C::Encoder as Encoder>::Input) -> GrpcCall<C::Decoder> {
143        let (mut encoder, decoder) = self.codec.into_components();
144        let upstream = self.upstream.cluster_name();
145        let result = match encoder.encode(message) {
146            Ok(bytes) => self
147                .host
148                .dispatch_grpc_call(
149                    upstream,
150                    self.service,
151                    self.method,
152                    self.initial_metadata,
153                    Some(bytes.as_ref()),
154                    self.timeout,
155                )
156                .map(GrpcCallId::new)
157                .map_err(|s| {
158                    if matches!(s, Status::ParseFailure) {
159                        GrpcCallError::InvalidUpstream(upstream.to_string())
160                    } else {
161                        GrpcCallError::Proxy(GrpcProxyError::new(s))
162                    }
163                }),
164            Err(e) => Err(GrpcCallError::Encode(e.into())),
165        };
166        GrpcCall::new(self.reactor, self.host, decoder, result)
167    }
168
169    pub(super) fn codec<T: Codec>(self, codec: T) -> GrpcCallBuilder<'a, T> {
170        GrpcCallBuilder {
171            reactor: self.reactor,
172            host: self.host,
173            upstream: self.upstream,
174            service: self.service,
175            method: self.method,
176            initial_metadata: self.initial_metadata,
177            timeout: self.timeout,
178            codec,
179        }
180    }
181}