folk-plugin-grpc 0.1.6

gRPC plugin for Folk — unary call passthrough to PHP workers via tonic
Documentation
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use axum::extract::State;
use axum::response::IntoResponse;
use bytes::Bytes;
use folk_api::Executor;
use http::{HeaderMap, HeaderValue, Response};
use http_body::Frame;
use http_body_util::BodyExt;
use tracing::debug;

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct GrpcEnvelope {
    pub service: String,
    pub method: String,
    pub payload: Vec<u8>,
    pub metadata: std::collections::HashMap<String, String>,
}

#[derive(Clone)]
pub struct GrpcState {
    pub executor: Arc<dyn Executor>,
}

pub async fn grpc_handler(
    State(state): State<GrpcState>,
    req: axum::extract::Request,
) -> impl IntoResponse {
    let path = req.uri().path().to_string();

    let parts: Vec<&str> = path.trim_start_matches('/').splitn(2, '/').collect();
    let (service, method) = match parts.as_slice() {
        [s, m] => (s.to_string(), m.to_string()),
        _ => return grpc_response(Bytes::new(), 12, "unimplemented: bad path"),
    };

    let metadata: std::collections::HashMap<String, String> = req
        .headers()
        .iter()
        .filter(|(k, _)| {
            let k = k.as_str();
            !k.starts_with(':')
                && k != "content-type"
                && k != "te"
                && k != "user-agent"
                && k != "grpc-timeout"
                && k != "grpc-encoding"
                && k != "grpc-accept-encoding"
        })
        .filter_map(|(k, v)| {
            v.to_str()
                .ok()
                .map(|v| (k.as_str().to_string(), v.to_string()))
        })
        .collect();

    let body_bytes = match req.into_body().collect().await {
        Ok(collected) => collected.to_bytes(),
        Err(e) => return grpc_response(Bytes::new(), 13, &format!("body: {e}")),
    };

    let proto_bytes = if body_bytes.len() >= 5 {
        body_bytes.slice(5..)
    } else {
        body_bytes
    };

    debug!(service, method, payload_len = proto_bytes.len(), "gRPC call");

    let envelope = GrpcEnvelope {
        service,
        method,
        payload: proto_bytes.to_vec(),
        metadata,
    };
    let encoded = match rmp_serde::to_vec_named(&envelope) {
        Ok(v) => Bytes::from(v),
        Err(e) => return grpc_response(Bytes::new(), 13, &format!("encode: {e}")),
    };

    let raw_response = match state.executor.execute_method("grpc.call", encoded).await {
        Ok(v) => v,
        Err(e) => return grpc_response(Bytes::new(), 13, &format!("worker: {e}")),
    };

    let proto_response = match rmp_serde::from_slice::<rmpv::Value>(&raw_response) {
        Ok(rmpv::Value::Binary(b)) => b,
        Ok(rmpv::Value::String(s)) => s.into_bytes(),
        Ok(other) => {
            return grpc_response(
                Bytes::new(),
                13,
                &format!("unexpected response type: {other:?}"),
            );
        }
        Err(_) => raw_response.to_vec(),
    };

    // Build gRPC-framed response
    let mut framed = Vec::with_capacity(5 + proto_response.len());
    framed.push(0u8);
    framed.extend_from_slice(&(proto_response.len() as u32).to_be_bytes());
    framed.extend_from_slice(&proto_response);

    grpc_response(Bytes::from(framed), 0, "")
}

/// Build a gRPC response with proper HTTP/2 trailers.
///
/// gRPC requires `grpc-status` (and optionally `grpc-message`) in trailers,
/// not in response headers. This is what grpcurl and other strict clients expect.
fn grpc_response(data: Bytes, status: u32, message: &str) -> Response<GrpcBody> {
    let mut trailers = HeaderMap::new();
    trailers.insert("grpc-status", HeaderValue::from(status));
    if !message.is_empty() {
        if let Ok(v) = HeaderValue::from_str(message) {
            trailers.insert("grpc-message", v);
        }
    }

    Response::builder()
        .status(200)
        .header("content-type", "application/grpc")
        .body(GrpcBody {
            data: Some(data),
            trailers: Some(trailers),
        })
        .unwrap()
}

/// Custom body type that sends data followed by gRPC trailers.
pub struct GrpcBody {
    data: Option<Bytes>,
    trailers: Option<HeaderMap>,
}

impl http_body::Body for GrpcBody {
    type Data = Bytes;
    type Error = std::convert::Infallible;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        if let Some(data) = self.data.take() {
            if !data.is_empty() {
                return Poll::Ready(Some(Ok(Frame::data(data))));
            }
        }
        if let Some(trailers) = self.trailers.take() {
            return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
        }
        Poll::Ready(None)
    }
}