knafeh 1.1.0

QUIC-based RPC library with Python bindings
Documentation
use std::sync::Arc;

use crate::codec::Codec;
use crate::error::{KnafehError, RpcStatusCode};
use crate::rpc::message::{Metadata, RpcRequest};
use crate::rpc::middleware::MiddlewareStack;
use crate::rpc::router::MethodRouter;
use crate::rpc::stream::RpcStreamReceiver;

/// Dispatches incoming HTTP/3 requests through the middleware stack and router.
pub struct RequestHandler {
    pub(crate) router: Arc<MethodRouter>,
    pub(crate) middleware: Arc<MiddlewareStack>,
    pub(crate) codec: Arc<dyn Codec>,
}

impl RequestHandler {
    pub fn new(
        router: Arc<MethodRouter>,
        middleware: Arc<MiddlewareStack>,
        codec: Arc<dyn Codec>,
    ) -> Self {
        Self {
            router,
            middleware,
            codec,
        }
    }

    /// Handle a unary RPC request end-to-end:
    /// decode → middleware.on_request → route → middleware.on_response → encode.
    pub async fn handle_unary(
        &self,
        method_path: String,
        raw_body: Vec<u8>,
        metadata: Metadata,
    ) -> (Vec<u8>, RpcStatusCode, String, Metadata) {
        // Decode the request body through the codec.
        let body = match self.codec.decode(&raw_body) {
            Ok(b) => b,
            Err(e) => {
                return (
                    Vec::new(),
                    RpcStatusCode::InvalidArgument,
                    format!("codec decode error: {e}"),
                    Metadata::new(),
                );
            }
        };

        let mut request = RpcRequest {
            method: method_path,
            metadata,
            body,
        };

        // Run request interceptors.
        if let Err(e) = self.middleware.apply_request(&mut request).await {
            return (
                Vec::new(),
                RpcStatusCode::Internal,
                format!("middleware error: {e}"),
                Metadata::new(),
            );
        }

        // Route to the service handler.
        let mut response = match self.router.route_unary(request).await {
            Ok(resp) => resp,
            Err(KnafehError::Service { code, message }) => {
                return (Vec::new(), code, message, Metadata::new());
            }
            Err(e) => {
                return (
                    Vec::new(),
                    RpcStatusCode::Internal,
                    format!("handler error: {e}"),
                    Metadata::new(),
                );
            }
        };

        // Run response interceptors.
        if let Err(e) = self.middleware.apply_response(&mut response).await {
            return (
                Vec::new(),
                RpcStatusCode::Internal,
                format!("middleware error: {e}"),
                Metadata::new(),
            );
        }

        // Encode the response body (skip for empty bodies — error responses
        // have no payload and empty bytes is not valid JSON).
        let encoded_body = if response.body.is_empty() {
            Vec::new()
        } else {
            match self.codec.encode(&response.body) {
                Ok(b) => b,
                Err(e) => {
                    return (
                        Vec::new(),
                        RpcStatusCode::Internal,
                        format!("codec encode error: {e}"),
                        Metadata::new(),
                    );
                }
            }
        };

        (
            encoded_body,
            response.status.code,
            response.status.message,
            response.metadata,
        )
    }

    /// Handle a server-streaming RPC request:
    /// decode → middleware.on_request → route → return stream receiver.
    ///
    /// Returns `(status_code, status_message, metadata, Option<receiver>)`.
    /// On success the receiver yields raw (un-encoded) chunks; the caller
    /// is responsible for encoding each chunk before sending.
    pub async fn handle_server_stream(
        &self,
        method_path: String,
        raw_body: Vec<u8>,
        metadata: Metadata,
    ) -> (RpcStatusCode, String, Metadata, Option<RpcStreamReceiver>) {
        let body = match self.codec.decode(&raw_body) {
            Ok(b) => b,
            Err(e) => {
                return (
                    RpcStatusCode::InvalidArgument,
                    format!("codec decode error: {e}"),
                    Metadata::new(),
                    None,
                );
            }
        };

        let mut request = RpcRequest {
            method: method_path,
            metadata,
            body,
        };

        if let Err(e) = self.middleware.apply_request(&mut request).await {
            return (
                RpcStatusCode::Internal,
                format!("middleware error: {e}"),
                Metadata::new(),
                None,
            );
        }

        match self.router.route_server_stream(request).await {
            Ok(stream_response) => (
                RpcStatusCode::Ok,
                String::new(),
                stream_response.metadata,
                Some(stream_response.receiver),
            ),
            Err(KnafehError::Service { code, message }) => (code, message, Metadata::new(), None),
            Err(e) => (
                RpcStatusCode::Internal,
                format!("handler error: {e}"),
                Metadata::new(),
                None,
            ),
        }
    }
}