grpc 0.9.0-alpha.2

The official Rust implementation of gRPC: a high performance, open source, universal RPC framework.
Documentation
/*
 *
 * Copyright 2025 gRPC authors.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to
 * deal in the Software without restriction, including without limitation the
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 * sell copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
 * IN THE SOFTWARE.
 *
 */

use std::sync::Arc;

use tokio::sync::oneshot;
use tonic::async_trait;

use crate::client::CallOptions;
use crate::core::RecvMessage;
use crate::core::RequestHeaders;
use crate::core::ResponseHeaders;
use crate::core::SendMessage;
use crate::core::Trailers;

pub(crate) mod interceptor;

pub struct Server {
    handler: Option<Arc<dyn DynHandle>>,
}

pub struct Call<SS, RS> {
    pub headers: RequestHeaders,
    pub send: SS,
    pub recv: RS,
    pub trailers_tx: oneshot::Sender<Trailers>,
}

#[trait_variant::make(Send)]
pub trait Listener {
    type SendStream: SendStream + 'static;
    type RecvStream: RecvStream + 'static;
    async fn accept(&self) -> Option<Call<Self::SendStream, Self::RecvStream>>;
}

impl Server {
    pub fn new() -> Self {
        Self { handler: None }
    }

    pub fn set_handler<H>(&mut self, h: H)
    where
        H: Handle + Send + Sync + 'static,
    {
        self.handler = Some(Arc::new(h))
    }

    pub async fn serve(&self, l: &impl Listener) {
        while let Some(call) = l.accept().await {
            let mut send: Box<dyn DynSendStream> = Box::new(call.send);
            let recv = BoxedRecvStream(Box::new(call.recv));
            let options = CallOptions::default();
            let trailers_tx = call.trailers_tx;
            let trailers = self
                .handler
                .as_ref()
                .unwrap()
                .dyn_handle(call.headers, options, &mut *send, recv)
                .await;
            let _ = trailers_tx.send(trailers);
        }
    }
}

impl Default for Server {
    fn default() -> Self {
        Self::new()
    }
}

/// A trait which may be implemented by types to handle server-side logic of
/// RPCs (Remote Procedure Calls, often shortened to "call").
#[trait_variant::make(Send)]
pub trait Handle: Send + Sync {
    /// Handles an RPC, accepting the send and receive streams that are used to
    /// interact with the call.  Note that `tx` is not static, so it cannot be
    /// sent to another task, meaning the RPC must end before handle returns.
    async fn handle(
        &self,
        headers: RequestHeaders,
        options: CallOptions,
        tx: &mut impl SendStream,
        rx: impl RecvStream + 'static,
    ) -> Trailers;
}

#[async_trait]
trait DynHandle: Send + Sync {
    async fn dyn_handle(
        &self,
        headers: RequestHeaders,
        options: CallOptions,
        tx: &mut dyn DynSendStream,
        rx: BoxedRecvStream,
    ) -> Trailers;
}

#[async_trait]
impl<T: Handle> DynHandle for T {
    async fn dyn_handle(
        &self,
        headers: RequestHeaders,
        options: CallOptions,
        mut tx: &mut dyn DynSendStream,
        rx: BoxedRecvStream,
    ) -> Trailers {
        self.handle(headers, options, &mut tx, rx).await
    }
}

// TODO: delete this type which is only needed pre-rust v1.92 due to a bug
// handling lifetimes:
//
// error: implementation of `server::RecvStream` is not general enough
//    --> grpc/src/server/mod.rs:108:5
//     |
// 108 |     async fn dyn_handle(
//     |     ^^^^^ implementation of `server::RecvStream` is not general enough
//     |
//     = note: `Box<(dyn server::DynRecvStream + '0)>` must implement `server::RecvStream`, for any lifetime `'0`...
//     = note: ...but `server::RecvStream` is actually implemented for the type `Box<(dyn server::DynRecvStream + 'static)>`
struct BoxedRecvStream(Box<dyn DynRecvStream + 'static>);

// Implement RecvStream for the wrapper instead of the Box directly
impl RecvStream for BoxedRecvStream {
    async fn next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>> {
        self.0.dyn_next(msg).await
    }
}

/// An item in a response stream from the server's view.
///
/// These items are sent to the client via a [`SendStream`], using references to
/// avoid allocations.
pub enum ResponseStreamItem<'a> {
    /// Indicates the headers for the stream.
    Headers(ResponseHeaders),
    /// Indicates a message on the stream.
    Message(&'a dyn SendMessage),
}

/// Represents the sending side of a server stream.  See `ResponseStream`
/// documentation for information about the different types of items and the
/// order in which they must be sent.
#[trait_variant::make(Send)]
pub trait SendStream {
    /// Sends the next item on the stream. Returns `Ok(())` on success, or
    /// `Err(())` on failure. `Err(())` is a terminal state.
    /// Calling this method after an error should be avoided and is unspecified.
    ///
    /// # Cancel safety
    ///
    /// This method is not intended to be cancellation safe.  If the returned
    /// future is not polled to completion, the behavior of any subsequent calls
    /// to the SendStream are undefined and data may be lost.
    async fn send<'a>(
        &mut self,
        item: ResponseStreamItem<'a>,
        options: SendOptions,
    ) -> Result<(), ()>;
}

#[async_trait]
trait DynSendStream: Send {
    async fn dyn_send<'a>(
        &mut self,
        item: ResponseStreamItem<'a>,
        options: SendOptions,
    ) -> Result<(), ()>;
}

#[async_trait]
impl<T: SendStream> DynSendStream for T {
    async fn dyn_send<'a>(
        &mut self,
        item: ResponseStreamItem<'a>,
        options: SendOptions,
    ) -> Result<(), ()> {
        self.send(item, options).await
    }
}

impl<'b> SendStream for &mut (dyn DynSendStream + 'b) {
    async fn send<'a>(
        &mut self,
        item: ResponseStreamItem<'a>,
        options: SendOptions,
    ) -> Result<(), ()> {
        (**self).dyn_send(item, options).await
    }
}

impl<'b> SendStream for Box<dyn DynSendStream + 'b> {
    async fn send<'a>(
        &mut self,
        item: ResponseStreamItem<'a>,
        options: SendOptions,
    ) -> Result<(), ()> {
        (**self).dyn_send(item, options).await
    }
}

/// Contains settings to configure a send operation on a SendStream.
#[derive(Default)]
#[non_exhaustive]
pub struct SendOptions {
    /// Delays sending the message until the trailers are provided on the stream
    /// and batches the two items together if possible.
    pub final_msg: bool,
    /// If set, compression will be disabled for this message.
    pub disable_compression: bool,
}

/// Represents the receiving side of a server stream.
#[trait_variant::make(Send)]
pub trait RecvStream {
    /// Returns the next message on the stream. Returns `Some(Ok(()))` on
    /// success, `None` on normal stream end, or `Some(Err(()))` if the stream
    /// encountered an error before the client's final request message. Both
    /// `None` and `Some(Err(()))` are terminal states.
    /// Calling this method again after reaching a terminal state is unspecified
    /// and should be avoided.
    ///
    /// # Cancel safety
    ///
    /// This method is not intended to be cancellation safe.  If the returned
    /// future is not polled to completion, the behavior of any subsequent calls
    /// to the RecvStream are undefined and data may be lost.
    async fn next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>>;
}

#[async_trait]
trait DynRecvStream: Send {
    async fn dyn_next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>>;
}

#[async_trait]
impl<T: RecvStream> DynRecvStream for T {
    async fn dyn_next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>> {
        self.next(msg).await
    }
}

impl<'a> RecvStream for Box<dyn DynRecvStream + 'a> {
    async fn next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>> {
        (**self).dyn_next(msg).await
    }
}