proxy_sdk/
grpc_call.rs

1use std::{
2    fmt,
3    ops::{Bound, RangeBounds},
4    time::Duration,
5};
6
7use derive_builder::Builder;
8
9use crate::{
10    downcast_box::DowncastBox,
11    hostcalls::{self, BufferType, MapType},
12    log_concern,
13    upstream::Upstream,
14    RootContext, Status,
15};
16
17/// Outbound GRPC call
18#[derive(Builder)]
19#[builder(setter(into))]
20#[builder(pattern = "owned")]
21#[allow(clippy::type_complexity)]
22pub struct GrpcCall<'a> {
23    /// Upstream cluster to send the request to.
24    pub upstream: Upstream<'a>,
25    /// The GRPC service to call.
26    pub service: &'a str,
27    /// The GRPC service method to call.
28    pub method: &'a str,
29    /// Initial GRPC metadata to send with the request.
30    #[builder(setter(each(name = "metadata")), default)]
31    pub initial_metadata: Vec<(&'a str, &'a [u8])>,
32    /// An optional request body to send with the request.
33    #[builder(setter(strip_option, into), default)]
34    pub message: Option<&'a [u8]>,
35    /// A timeout on waiting for a response. Default is 10 seconds.
36    #[builder(setter(strip_option, into), default)]
37    pub timeout: Option<Duration>,
38    /// Callback to call when a response has arrived.
39    #[builder(setter(custom), default)]
40    pub callback: Option<Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcCallResponse)>>,
41}
42
43impl<'a> GrpcCallBuilder<'a> {
44    /// Set a response callback
45    pub fn callback<R: RootContext + 'static>(
46        mut self,
47        callback: impl FnOnce(&mut R, &GrpcCallResponse) + 'static,
48    ) -> Self {
49        self.callback = Some(Some(Box::new(move |root, resp| {
50            callback(
51                root.as_any_mut().downcast_mut().expect("invalid root type"),
52                resp,
53            )
54        })));
55        self
56    }
57}
58
59impl<'a> GrpcCall<'a> {
60    const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
61
62    /// Sends this `GrpcCall` over the network.
63    pub fn dispatch(self) -> Result<GrpcCancelHandle, Status> {
64        let token = hostcalls::dispatch_grpc_call(
65            &self.upstream.0,
66            self.service,
67            self.method,
68            &self.initial_metadata,
69            self.message,
70            self.timeout.unwrap_or(Self::DEFAULT_TIMEOUT),
71        )?;
72        if let Some(callback) = self.callback {
73            crate::dispatcher::register_grpc_callback(token, callback);
74        }
75        Ok(GrpcCancelHandle(token))
76    }
77}
78
79/// GRPC Call Handle to cancel a request
80#[derive(Debug)]
81pub struct GrpcCancelHandle(u32);
82
83impl GrpcCancelHandle {
84    /// Attempts to cancel the GRPC call
85    pub fn cancel(&self) {
86        hostcalls::cancel_grpc_call(self.0).ok();
87    }
88}
89
90impl fmt::Display for GrpcCancelHandle {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        self.0.fmt(f)
93    }
94}
95
96impl PartialEq<u32> for GrpcCancelHandle {
97    fn eq(&self, other: &u32) -> bool {
98        self.0 == *other
99    }
100}
101
102impl PartialEq<GrpcCancelHandle> for u32 {
103    fn eq(&self, other: &GrpcCancelHandle) -> bool {
104        other == self
105    }
106}
107
108/// Copied from `tonic` crate, GRPC status codes
109#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
110#[repr(u32)]
111pub enum GrpcCode {
112    /// The operation completed successfully.
113    Ok = 0,
114
115    /// The operation was cancelled.
116    Cancelled = 1,
117
118    /// Unknown error.
119    Unknown = 2,
120
121    /// Client specified an invalid argument.
122    InvalidArgument = 3,
123
124    /// Deadline expired before operation could complete.
125    DeadlineExceeded = 4,
126
127    /// Some requested entity was not found.
128    NotFound = 5,
129
130    /// Some entity that we attempted to create already exists.
131    AlreadyExists = 6,
132
133    /// The caller does not have permission to execute the specified operation.
134    PermissionDenied = 7,
135
136    /// Some resource has been exhausted.
137    ResourceExhausted = 8,
138
139    /// The system is not in a state required for the operation's execution.
140    FailedPrecondition = 9,
141
142    /// The operation was aborted.
143    Aborted = 10,
144
145    /// Operation was attempted past the valid range.
146    OutOfRange = 11,
147
148    /// Operation is not implemented or not supported.
149    Unimplemented = 12,
150
151    /// Internal error.
152    Internal = 13,
153
154    /// The service is currently unavailable.
155    Unavailable = 14,
156
157    /// Unrecoverable data loss or corruption.
158    DataLoss = 15,
159
160    /// The request does not have valid authentication credentials
161    Unauthenticated = 16,
162
163    /// Unknown code
164    Other(u32),
165}
166
167impl From<u32> for GrpcCode {
168    fn from(value: u32) -> GrpcCode {
169        match value {
170            0 => GrpcCode::Ok,
171            1 => GrpcCode::Cancelled,
172            2 => GrpcCode::Unknown,
173            3 => GrpcCode::InvalidArgument,
174            4 => GrpcCode::DeadlineExceeded,
175            5 => GrpcCode::NotFound,
176            6 => GrpcCode::AlreadyExists,
177            7 => GrpcCode::PermissionDenied,
178            8 => GrpcCode::ResourceExhausted,
179            9 => GrpcCode::FailedPrecondition,
180            10 => GrpcCode::Aborted,
181            11 => GrpcCode::OutOfRange,
182            12 => GrpcCode::Unimplemented,
183            13 => GrpcCode::Internal,
184            14 => GrpcCode::Unavailable,
185            15 => GrpcCode::DataLoss,
186            16 => GrpcCode::Unauthenticated,
187            x => GrpcCode::Other(x),
188        }
189    }
190}
191
192impl PartialEq<u32> for GrpcCode {
193    fn eq(&self, other: &u32) -> bool {
194        *self == Self::from(*other)
195    }
196}
197
198impl PartialEq<GrpcCode> for u32 {
199    fn eq(&self, other: &GrpcCode) -> bool {
200        other == self
201    }
202}
203
204/// Response type for [`GrpcCall::callback`]
205pub struct GrpcCallResponse {
206    handle_id: u32,
207    status_code: GrpcCode,
208    body_size: usize,
209    message: Option<String>,
210}
211
212impl GrpcCallResponse {
213    pub(crate) fn new(
214        token_id: u32,
215        status_code: GrpcCode,
216        message: Option<String>,
217        body_size: usize,
218    ) -> Self {
219        Self {
220            handle_id: token_id,
221            status_code,
222            body_size,
223            message,
224        }
225    }
226
227    /// GRPC handle ID of the response
228    pub fn handle_id(&self) -> u32 {
229        self.handle_id
230    }
231
232    /// GRPC status code of the response
233    pub fn status_code(&self) -> GrpcCode {
234        self.status_code
235    }
236
237    /// Optional GRPC status message of the response
238    pub fn status_message(&self) -> Option<&str> {
239        self.message.as_deref()
240    }
241
242    /// Total size of the response body
243    pub fn body_size(&self) -> usize {
244        self.body_size
245    }
246
247    /// Get all response headers
248    pub fn headers(&self) -> Vec<(String, Vec<u8>)> {
249        log_concern(
250            "grpc-call-headers",
251            hostcalls::get_map(MapType::HttpCallResponseHeaders),
252        )
253        .unwrap_or_default()
254    }
255
256    /// Get a specific response header
257    pub fn header(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
258        log_concern(
259            "grpc-call-header",
260            hostcalls::get_map_value(MapType::HttpCallResponseHeaders, name.as_ref()),
261        )
262    }
263
264    /// Get a range of the response body
265    pub fn body(&self, range: impl RangeBounds<usize>) -> Option<Vec<u8>> {
266        let start = match range.start_bound() {
267            Bound::Included(x) => *x,
268            Bound::Excluded(x) => x.saturating_sub(1),
269            Bound::Unbounded => 0,
270        };
271        let size = match range.end_bound() {
272            Bound::Included(x) => *x + 1,
273            Bound::Excluded(x) => *x,
274            Bound::Unbounded => self.body_size,
275        }
276        .min(self.body_size)
277        .saturating_sub(start);
278        log_concern(
279            "grpc-call-body",
280            hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, start, size),
281        )
282    }
283
284    /// Get the entire response body
285    pub fn full_body(&self) -> Option<Vec<u8>> {
286        self.body(..)
287    }
288
289    /// Get all response trailers
290    pub fn trailers(&self) -> Vec<(String, Vec<u8>)> {
291        log_concern(
292            "grpc-call-trailers",
293            hostcalls::get_map(MapType::HttpCallResponseTrailers),
294        )
295        .unwrap_or_default()
296    }
297
298    /// Get a specific response trailer
299    pub fn trailer(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
300        log_concern(
301            "grpc-call-trailer",
302            hostcalls::get_map_value(MapType::HttpCallResponseTrailers, name.as_ref()),
303        )
304    }
305}