use std::{
fmt,
ops::{Bound, RangeBounds},
};
use derive_builder::Builder;
use crate::{
downcast_box::DowncastBox,
grpc_call::GrpcCode,
hostcalls::{self, BufferType},
log_concern, RootContext, Status, Upstream,
};
#[cfg(feature = "stream-metadata")]
use crate::hostcalls::MapType;
#[derive(Builder)]
#[builder(setter(into))]
#[builder(pattern = "owned")]
#[allow(clippy::type_complexity)]
pub struct GrpcStream<'a> {
pub cluster: Upstream<'a>,
pub service: &'a str,
pub method: &'a str,
#[builder(setter(each(name = "metadata")), default)]
pub initial_metadata: Vec<(&'a str, &'a [u8])>,
#[cfg(feature = "stream-metadata")]
#[builder(setter(custom), default)]
pub on_initial_metadata: Option<
Box<
dyn FnMut(
&mut DowncastBox<dyn RootContext>,
GrpcStreamHandle,
&GrpcStreamInitialMetadata,
),
>,
>,
#[builder(setter(custom), default)]
pub on_message: Option<
Box<dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamMessage)>,
>,
#[cfg(feature = "stream-metadata")]
#[builder(setter(custom), default)]
pub on_trailing_metadata: Option<
Box<
dyn FnMut(
&mut DowncastBox<dyn RootContext>,
GrpcStreamHandle,
&GrpcStreamTrailingMetadata,
),
>,
>,
#[builder(setter(custom), default)]
pub on_close: Option<Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcStreamClose)>>,
}
impl<'a> GrpcStreamBuilder<'a> {
#[cfg(feature = "stream-metadata")]
pub fn on_initial_metadata<R: RootContext + 'static>(
mut self,
mut callback: impl FnMut(&mut R, GrpcStreamHandle, &GrpcStreamInitialMetadata) + 'static,
) -> Self {
self.on_initial_metadata = Some(Some(Box::new(move |root, handle, metadata| {
callback(
root.as_any_mut().downcast_mut().expect("invalid root type"),
handle,
metadata,
)
})));
self
}
pub fn on_message<R: RootContext + 'static>(
mut self,
mut callback: impl FnMut(&mut R, GrpcStreamHandle, &GrpcStreamMessage) + 'static,
) -> Self {
self.on_message = Some(Some(Box::new(move |root, handle, message| {
callback(
root.as_any_mut().downcast_mut().expect("invalid root type"),
handle,
message,
)
})));
self
}
#[cfg(feature = "stream-metadata")]
pub fn on_trailing_metadata<R: RootContext + 'static>(
mut self,
mut callback: impl FnMut(&mut R, GrpcStreamHandle, &GrpcStreamTrailingMetadata) + 'static,
) -> Self {
self.on_trailing_metadata = Some(Some(Box::new(move |root, handle, metadata| {
callback(
root.as_any_mut().downcast_mut().expect("invalid root type"),
handle,
metadata,
)
})));
self
}
pub fn on_close<R: RootContext + 'static>(
mut self,
callback: impl FnOnce(&mut R, &GrpcStreamClose) + 'static,
) -> Self {
self.on_close = Some(Some(Box::new(move |root, close| {
callback(
root.as_any_mut().downcast_mut().expect("invalid root type"),
close,
)
})));
self
}
}
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct GrpcStreamHandle(pub(crate) u32);
impl<'a> GrpcStream<'a> {
pub fn open(self) -> Result<GrpcStreamHandle, Status> {
let token = hostcalls::open_grpc_stream(
&self.cluster.0,
self.service,
self.method,
&self.initial_metadata,
)?;
#[cfg(feature = "stream-metadata")]
if let Some(callback) = self.on_initial_metadata {
crate::dispatcher::register_grpc_stream_initial_meta(token, callback);
}
if let Some(callback) = self.on_message {
crate::dispatcher::register_grpc_stream_message(token, callback);
}
#[cfg(feature = "stream-metadata")]
if let Some(callback) = self.on_trailing_metadata {
crate::dispatcher::register_grpc_stream_trailing_metadata(token, callback);
}
if let Some(callback) = self.on_close {
crate::dispatcher::register_grpc_stream_close(token, callback);
}
Ok(GrpcStreamHandle(token))
}
}
impl GrpcStreamHandle {
pub fn cancel(&self) {
hostcalls::cancel_grpc_stream(self.0).ok();
}
pub fn close(&self) {
hostcalls::close_grpc_stream(self.0).ok();
}
pub fn send(&self, message: Option<impl AsRef<[u8]>>, end_stream: bool) -> Result<(), Status> {
hostcalls::send_grpc_stream_message(
self.0,
message.as_ref().map(|x| x.as_ref()),
end_stream,
)
}
}
impl PartialEq<u32> for GrpcStreamHandle {
fn eq(&self, other: &u32) -> bool {
self.0 == *other
}
}
impl PartialEq<GrpcStreamHandle> for u32 {
fn eq(&self, other: &GrpcStreamHandle) -> bool {
other == self
}
}
impl fmt::Display for GrpcStreamHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
#[cfg(feature = "stream-metadata")]
pub struct GrpcStreamInitialMetadata {
num_elements: usize,
}
#[cfg(feature = "stream-metadata")]
impl GrpcStreamInitialMetadata {
pub(crate) fn new(num_elements: usize) -> Self {
Self { num_elements }
}
pub fn num_elements(&self) -> usize {
self.num_elements
}
pub fn all(&self) -> Vec<(String, Vec<u8>)> {
log_concern(
"grpc-stream-metadata-all",
hostcalls::get_map(MapType::GrpcReceiveInitialMetadata),
)
.unwrap_or_default()
}
pub fn value(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
log_concern(
"grpc-stream-metadata",
hostcalls::get_map_value(MapType::GrpcReceiveInitialMetadata, name.as_ref()),
)
}
}
pub struct GrpcStreamMessage {
status_code: GrpcCode,
body_size: usize,
message: Option<String>,
}
impl GrpcStreamMessage {
pub(crate) fn new(status_code: GrpcCode, message: Option<String>, body_size: usize) -> Self {
Self {
status_code,
body_size,
message,
}
}
pub fn status_code(&self) -> GrpcCode {
self.status_code
}
pub fn status_message(&self) -> Option<&str> {
self.message.as_deref()
}
pub fn body_size(&self) -> usize {
self.body_size
}
pub fn body(&self, range: impl RangeBounds<usize>) -> Option<Vec<u8>> {
let start = match range.start_bound() {
Bound::Included(x) => *x,
Bound::Excluded(x) => x.saturating_sub(1),
Bound::Unbounded => 0,
};
let size = match range.end_bound() {
Bound::Included(x) => *x + 1,
Bound::Excluded(x) => *x,
Bound::Unbounded => self.body_size,
}
.min(self.body_size)
.saturating_sub(start);
log_concern(
"grpc-stream-message-body",
hostcalls::get_buffer(BufferType::GrpcReceiveBuffer, start, size),
)
}
pub fn full_body(&self) -> Option<Vec<u8>> {
self.body(..self.body_size)
}
}
#[cfg(feature = "stream-metadata")]
pub struct GrpcStreamTrailingMetadata {
num_elements: usize,
}
#[cfg(feature = "stream-metadata")]
impl GrpcStreamTrailingMetadata {
pub(crate) fn new(num_elements: usize) -> Self {
Self { num_elements }
}
pub fn num_elements(&self) -> usize {
self.num_elements
}
pub fn all(&self) -> Vec<(String, Vec<u8>)> {
log_concern(
"grpc-stream-trailing-metadata-all",
hostcalls::get_map(MapType::GrpcReceiveTrailingMetadata),
)
.unwrap_or_default()
}
pub fn value(&self, name: impl AsRef<str>) -> Option<Vec<u8>> {
log_concern(
"grpc-stream-trailing-metadata",
hostcalls::get_map_value(MapType::GrpcReceiveTrailingMetadata, name.as_ref()),
)
}
}
pub struct GrpcStreamClose {
handle_id: u32,
status_code: GrpcCode,
message: Option<String>,
}
impl GrpcStreamClose {
pub(crate) fn new(token_id: u32, status_code: GrpcCode, message: Option<String>) -> Self {
Self {
handle_id: token_id,
status_code,
message,
}
}
pub fn handle_id(&self) -> u32 {
self.handle_id
}
pub fn status_code(&self) -> GrpcCode {
self.status_code
}
pub fn status_message(&self) -> Option<&str> {
self.message.as_deref()
}
}