pdk-classy 1.9.1-alpha.2

PDK Classy
Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use std::convert::Infallible;
use std::rc::Rc;
use std::time::Duration;

use crate::proxy_wasm::types::Status;

use crate::client::Service;
use crate::extract::context::ConfigureContext;
use crate::extract::{extractability, FromContext};
use crate::host::grpc::GrpcHost;

use crate::http_constants::DEFAULT_TIMEOUT;
use crate::reactor::root::RootReactor;

use crate::grpc::call::GrpcCall;
use crate::grpc::error::GrpcCallError;
use crate::grpc::transport::GrpcCallId;

use super::codec::{BytesCodec, Codec, Encoder};
use super::GrpcProxyError;

/// An asynchronous gRPC client to make calls with.
pub struct GrpcClient {
    reactor: Rc<RootReactor>,
    grpc_host: Rc<dyn GrpcHost>,
}

impl FromContext<ConfigureContext, extractability::Transitive> for GrpcClient {
    type Error = Infallible;

    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
        Ok(Self {
            reactor: Rc::clone(&context.root_reactor),
            grpc_host: Rc::clone(&context.grpc_host),
        })
    }
}

impl GrpcClient {
    /// Creates a gRPC request that will forward the whole response to the caller.
    pub fn request<'a>(&'a self, upstream: &'a Service) -> GrpcServiceBuilder<'a> {
        GrpcServiceBuilder::new(self.reactor.clone(), self.grpc_host.clone(), upstream)
    }
}

/// A gRPC request builder for gRPC services.
pub struct GrpcServiceBuilder<'a> {
    reactor: Rc<RootReactor>,
    host: Rc<dyn GrpcHost>,
    upstream: &'a Service,
}

impl<'a> GrpcServiceBuilder<'a> {
    pub(super) fn new(
        reactor: Rc<RootReactor>,
        host: Rc<dyn GrpcHost>,
        upstream: &'a Service,
    ) -> Self {
        Self {
            reactor,
            host,
            upstream,
        }
    }

    /// Sets the gRPC service.
    pub fn service(self, service: &'a str) -> GrpcMethodBuilder<'a> {
        GrpcMethodBuilder {
            reactor: self.reactor,
            host: self.host,
            upstream: self.upstream,
            service,
        }
    }
}

/// A gRPC request builder for gRPC methods.
pub struct GrpcMethodBuilder<'a> {
    reactor: Rc<RootReactor>,
    host: Rc<dyn GrpcHost>,
    upstream: &'a Service,
    service: &'a str,
}

/// A gRPC request builder for gRPC methods.
impl<'a> GrpcMethodBuilder<'a> {
    /// Sets the gRPC method.
    pub fn method(self, method: &'a str) -> GrpcCallBuilder<'a, BytesCodec> {
        GrpcCallBuilder {
            reactor: self.reactor,
            host: self.host,
            upstream: self.upstream,
            service: self.service,
            method,
            initial_metadata: Vec::new(),
            timeout: DEFAULT_TIMEOUT,
            codec: BytesCodec,
        }
    }
}

/// A gRPC call builder.
pub struct GrpcCallBuilder<'a, C> {
    reactor: Rc<RootReactor>,
    host: Rc<dyn GrpcHost>,
    upstream: &'a Service,
    service: &'a str,
    method: &'a str,
    initial_metadata: Vec<(&'a str, &'a [u8])>,
    timeout: Duration,
    codec: C,
}

impl<'a, C> GrpcCallBuilder<'a, C>
where
    C: Codec,
{
    /// Sets the initial metadata.
    pub fn initial_metadata(self, initial_metadata: Vec<(&'a str, &'a [u8])>) -> Self {
        Self {
            initial_metadata,
            ..self
        }
    }

    /// Adds initial metadata.
    pub fn add_initial_metadata(mut self, name: &'a str, values: &'a [u8]) -> Self {
        self.initial_metadata.push((name, values));
        self
    }

    /// Sets the request timeout.
    pub fn timeout(self, timeout: Duration) -> Self {
        Self { timeout, ..self }
    }

    /// Sends a oneshot gRPC request to the upstream.
    pub fn send(self, message: &<C::Encoder as Encoder>::Input) -> GrpcCall<C::Decoder> {
        let (mut encoder, decoder) = self.codec.into_components();
        let upstream = self.upstream.cluster_name();
        let result = match encoder.encode(message) {
            Ok(bytes) => self
                .host
                .dispatch_grpc_call(
                    upstream,
                    self.service,
                    self.method,
                    self.initial_metadata,
                    Some(bytes.as_ref()),
                    self.timeout,
                )
                .map(GrpcCallId::new)
                .map_err(|s| {
                    if matches!(s, Status::ParseFailure) {
                        GrpcCallError::InvalidUpstream(upstream.to_string())
                    } else {
                        GrpcCallError::Proxy(GrpcProxyError::new(s))
                    }
                }),
            Err(e) => Err(GrpcCallError::Encode(e.into())),
        };
        GrpcCall::new(self.reactor, self.host, decoder, result)
    }

    pub(super) fn codec<T: Codec>(self, codec: T) -> GrpcCallBuilder<'a, T> {
        GrpcCallBuilder {
            reactor: self.reactor,
            host: self.host,
            upstream: self.upstream,
            service: self.service,
            method: self.method,
            initial_metadata: self.initial_metadata,
            timeout: self.timeout,
            codec,
        }
    }
}