1use std::{
2 fmt,
3 ops::{Bound, RangeBounds},
4 time::Duration,
5};
6
7use derive_builder::Builder;
8
9use crate::{
10 downcast_box::DowncastBox,
11 hostcalls::{self, BufferType, MapType},
12 log_concern,
13 upstream::Upstream,
14 RootContext, Status,
15};
16
17#[derive(Builder)]
19#[builder(setter(into))]
20#[builder(pattern = "owned")]
21#[allow(clippy::type_complexity)]
22pub struct GrpcCall<'a> {
23 pub upstream: Upstream<'a>,
25 pub service: &'a str,
27 pub method: &'a str,
29 #[builder(setter(each(name = "metadata")), default)]
31 pub initial_metadata: Vec<(&'a str, &'a [u8])>,
32 #[builder(setter(strip_option, into), default)]
34 pub message: Option<&'a [u8]>,
35 #[builder(setter(strip_option, into), default)]
37 pub timeout: Option<Duration>,
38 #[builder(setter(custom), default)]
40 pub callback: Option<Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcCallResponse)>>,
41}
42
43impl<'a> GrpcCallBuilder<'a> {
44 pub fn callback<R: RootContext + 'static>(
46 mut self,
47 callback: impl FnOnce(&mut R, &GrpcCallResponse) + 'static,
48 ) -> Self {
49 self.callback = Some(Some(Box::new(move |root, resp| {
50 callback(
51 root.as_any_mut().downcast_mut().expect("invalid root type"),
52 resp,
53 )
54 })));
55 self
56 }
57}
58
59impl<'a> GrpcCall<'a> {
60 const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
61
62 pub fn dispatch(self) -> Result<GrpcCancelHandle, Status> {
64 let token = hostcalls::dispatch_grpc_call(
65 &self.upstream.0,
66 self.service,
67 self.method,
68 &self.initial_metadata,
69 self.message,
70 self.timeout.unwrap_or(Self::DEFAULT_TIMEOUT),
71 )?;
72 if let Some(callback) = self.callback {
73 crate::dispatcher::register_grpc_callback(token, callback);
74 }
75 Ok(GrpcCancelHandle(token))
76 }
77}
78
79#[derive(Debug)]
81pub struct GrpcCancelHandle(u32);
82
83impl GrpcCancelHandle {
84 pub fn cancel(&self) {
86 hostcalls::cancel_grpc_call(self.0).ok();
87 }
88}
89
90impl fmt::Display for GrpcCancelHandle {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 self.0.fmt(f)
93 }
94}
95
96impl PartialEq<u32> for GrpcCancelHandle {
97 fn eq(&self, other: &u32) -> bool {
98 self.0 == *other
99 }
100}
101
102impl PartialEq<GrpcCancelHandle> for u32 {
103 fn eq(&self, other: &GrpcCancelHandle) -> bool {
104 other == self
105 }
106}
107
108#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
110#[repr(u32)]
111pub enum GrpcCode {
112 Ok = 0,
114
115 Cancelled = 1,
117
118 Unknown = 2,
120
121 InvalidArgument = 3,
123
124 DeadlineExceeded = 4,
126
127 NotFound = 5,
129
130 AlreadyExists = 6,
132
133 PermissionDenied = 7,
135
136 ResourceExhausted = 8,
138
139 FailedPrecondition = 9,
141
142 Aborted = 10,
144
145 OutOfRange = 11,
147
148 Unimplemented = 12,
150
151 Internal = 13,
153
154 Unavailable = 14,
156
157 DataLoss = 15,
159
160 Unauthenticated = 16,
162
163 Other(u32),
165}
166
167impl From<u32> for GrpcCode {
168 fn from(value: u32) -> GrpcCode {
169 match value {
170 0 => GrpcCode::Ok,
171 1 => GrpcCode::Cancelled,
172 2 => GrpcCode::Unknown,
173 3 => GrpcCode::InvalidArgument,
174 4 => GrpcCode::DeadlineExceeded,
175 5 => GrpcCode::NotFound,
176 6 => GrpcCode::AlreadyExists,
177 7 => GrpcCode::PermissionDenied,
178 8 => GrpcCode::ResourceExhausted,
179 9 => GrpcCode::FailedPrecondition,
180 10 => GrpcCode::Aborted,
181 11 => GrpcCode::OutOfRange,
182 12 => GrpcCode::Unimplemented,
183 13 => GrpcCode::Internal,
184 14 => GrpcCode::Unavailable,
185 15 => GrpcCode::DataLoss,
186 16 => GrpcCode::Unauthenticated,
187 x => GrpcCode::Other(x),
188 }
189 }
190}
191
192impl PartialEq<u32> for GrpcCode {
193 fn eq(&self, other: &u32) -> bool {
194 *self == Self::from(*other)
195 }
196}
197
198impl PartialEq<GrpcCode> for u32 {
199 fn eq(&self, other: &GrpcCode) -> bool {
200 other == self
201 }
202}
203
204pub struct GrpcCallResponse {
206 handle_id: u32,
207 status_code: GrpcCode,
208 body_size: usize,
209 message: Option<String>,
210}
211
212impl GrpcCallResponse {
213 pub(crate) fn new(
214 token_id: u32,
215 status_code: GrpcCode,
216 message: Option<String>,
217 body_size: usize,
218 ) -> Self {
219 Self {
220 handle_id: token_id,
221 status_code,
222 body_size,
223 message,
224 }
225 }
226
227 pub fn handle_id(&self) -> u32 {
229 self.handle_id
230 }
231
232 pub fn status_code(&self) -> GrpcCode {
234 self.status_code
235 }
236
237 pub fn status_message(&self) -> Option<&str> {
239 self.message.as_deref()
240 }
241
242 pub fn body_size(&self) -> usize {
244 self.body_size
245 }
246
247 pub fn headers(&self) -> Vec<(String, Vec<u8>)> {
249 log_concern(
250 "grpc-call-headers",
251 hostcalls::get_map(MapType::HttpCallResponseHeaders),
252 )
253 .unwrap_or_default()
254 }
255
256 pub fn header(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
258 log_concern(
259 "grpc-call-header",
260 hostcalls::get_map_value(MapType::HttpCallResponseHeaders, name.as_ref()),
261 )
262 }
263
264 pub fn body(&self, range: impl RangeBounds<usize>) -> Option<Vec<u8>> {
266 let start = match range.start_bound() {
267 Bound::Included(x) => *x,
268 Bound::Excluded(x) => x.saturating_sub(1),
269 Bound::Unbounded => 0,
270 };
271 let size = match range.end_bound() {
272 Bound::Included(x) => *x + 1,
273 Bound::Excluded(x) => *x,
274 Bound::Unbounded => self.body_size,
275 }
276 .min(self.body_size)
277 .saturating_sub(start);
278 log_concern(
279 "grpc-call-body",
280 hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, start, size),
281 )
282 }
283
284 pub fn full_body(&self) -> Option<Vec<u8>> {
286 self.body(..)
287 }
288
289 pub fn trailers(&self) -> Vec<(String, Vec<u8>)> {
291 log_concern(
292 "grpc-call-trailers",
293 hostcalls::get_map(MapType::HttpCallResponseTrailers),
294 )
295 .unwrap_or_default()
296 }
297
298 pub fn trailer(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
300 log_concern(
301 "grpc-call-trailer",
302 hostcalls::get_map_value(MapType::HttpCallResponseTrailers, name.as_ref()),
303 )
304 }
305}