use std::{
fmt,
ops::{Bound, RangeBounds},
time::Duration,
};
use derive_builder::Builder;
use crate::{
downcast_box::DowncastBox,
hostcalls::{self, BufferType, MapType},
log_concern,
upstream::Upstream,
RootContext, Status,
};
#[derive(Builder)]
#[builder(setter(into))]
#[builder(pattern = "owned")]
#[allow(clippy::type_complexity)]
pub struct GrpcCall<'a> {
pub upstream: Upstream<'a>,
pub service: &'a str,
pub method: &'a str,
#[builder(setter(each(name = "metadata")), default)]
pub initial_metadata: Vec<(&'a str, &'a [u8])>,
#[builder(setter(strip_option, into), default)]
pub message: Option<&'a [u8]>,
#[builder(setter(strip_option, into), default)]
pub timeout: Option<Duration>,
#[builder(setter(custom), default)]
pub callback: Option<Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcCallResponse)>>,
}
impl<'a> GrpcCallBuilder<'a> {
pub fn callback<R: RootContext + 'static>(
mut self,
callback: impl FnOnce(&mut R, &GrpcCallResponse) + 'static,
) -> Self {
self.callback = Some(Some(Box::new(move |root, resp| {
callback(
root.as_any_mut().downcast_mut().expect("invalid root type"),
resp,
)
})));
self
}
}
impl<'a> GrpcCall<'a> {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
pub fn dispatch(self) -> Result<GrpcCancelHandle, Status> {
let token = hostcalls::dispatch_grpc_call(
&self.upstream.0,
self.service,
self.method,
&self.initial_metadata,
self.message,
self.timeout.unwrap_or(Self::DEFAULT_TIMEOUT),
)?;
if let Some(callback) = self.callback {
crate::dispatcher::register_grpc_callback(token, callback);
}
Ok(GrpcCancelHandle(token))
}
}
#[derive(Debug)]
pub struct GrpcCancelHandle(u32);
impl GrpcCancelHandle {
pub fn cancel(&self) {
hostcalls::cancel_grpc_call(self.0).ok();
}
}
impl fmt::Display for GrpcCancelHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl PartialEq<u32> for GrpcCancelHandle {
fn eq(&self, other: &u32) -> bool {
self.0 == *other
}
}
impl PartialEq<GrpcCancelHandle> for u32 {
fn eq(&self, other: &GrpcCancelHandle) -> bool {
other == self
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[repr(u32)]
pub enum GrpcCode {
Ok = 0,
Cancelled = 1,
Unknown = 2,
InvalidArgument = 3,
DeadlineExceeded = 4,
NotFound = 5,
AlreadyExists = 6,
PermissionDenied = 7,
ResourceExhausted = 8,
FailedPrecondition = 9,
Aborted = 10,
OutOfRange = 11,
Unimplemented = 12,
Internal = 13,
Unavailable = 14,
DataLoss = 15,
Unauthenticated = 16,
Other(u32),
}
impl From<u32> for GrpcCode {
fn from(value: u32) -> GrpcCode {
match value {
0 => GrpcCode::Ok,
1 => GrpcCode::Cancelled,
2 => GrpcCode::Unknown,
3 => GrpcCode::InvalidArgument,
4 => GrpcCode::DeadlineExceeded,
5 => GrpcCode::NotFound,
6 => GrpcCode::AlreadyExists,
7 => GrpcCode::PermissionDenied,
8 => GrpcCode::ResourceExhausted,
9 => GrpcCode::FailedPrecondition,
10 => GrpcCode::Aborted,
11 => GrpcCode::OutOfRange,
12 => GrpcCode::Unimplemented,
13 => GrpcCode::Internal,
14 => GrpcCode::Unavailable,
15 => GrpcCode::DataLoss,
16 => GrpcCode::Unauthenticated,
x => GrpcCode::Other(x),
}
}
}
impl PartialEq<u32> for GrpcCode {
fn eq(&self, other: &u32) -> bool {
*self == Self::from(*other)
}
}
impl PartialEq<GrpcCode> for u32 {
fn eq(&self, other: &GrpcCode) -> bool {
other == self
}
}
pub struct GrpcCallResponse {
handle_id: u32,
status_code: GrpcCode,
body_size: usize,
message: Option<String>,
}
impl GrpcCallResponse {
pub(crate) fn new(
token_id: u32,
status_code: GrpcCode,
message: Option<String>,
body_size: usize,
) -> Self {
Self {
handle_id: token_id,
status_code,
body_size,
message,
}
}
pub fn handle_id(&self) -> u32 {
self.handle_id
}
pub fn status_code(&self) -> GrpcCode {
self.status_code
}
pub fn status_message(&self) -> Option<&str> {
self.message.as_deref()
}
pub fn body_size(&self) -> usize {
self.body_size
}
pub fn headers(&self) -> Vec<(String, Vec<u8>)> {
log_concern(
"grpc-call-headers",
hostcalls::get_map(MapType::HttpCallResponseHeaders),
)
.unwrap_or_default()
}
pub fn header(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
log_concern(
"grpc-call-header",
hostcalls::get_map_value(MapType::HttpCallResponseHeaders, name.as_ref()),
)
}
pub fn body(&self, range: impl RangeBounds<usize>) -> Option<Vec<u8>> {
let start = match range.start_bound() {
Bound::Included(x) => *x,
Bound::Excluded(x) => x.saturating_sub(1),
Bound::Unbounded => 0,
};
let size = match range.end_bound() {
Bound::Included(x) => *x + 1,
Bound::Excluded(x) => *x,
Bound::Unbounded => self.body_size,
}
.min(self.body_size)
.saturating_sub(start);
log_concern(
"grpc-call-body",
hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, start, size),
)
}
pub fn full_body(&self) -> Option<Vec<u8>> {
self.body(..)
}
pub fn trailers(&self) -> Vec<(String, Vec<u8>)> {
log_concern(
"grpc-call-trailers",
hostcalls::get_map(MapType::HttpCallResponseTrailers),
)
.unwrap_or_default()
}
pub fn trailer(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
log_concern(
"grpc-call-trailer",
hostcalls::get_map_value(MapType::HttpCallResponseTrailers, name.as_ref()),
)
}
}