folk-plugin-grpc 0.1.0

gRPC plugin for Folk — unary call passthrough to PHP workers via tonic
Documentation
use std::sync::Arc;

use axum::body::Body;
use axum::extract::State;
use axum::response::IntoResponse;
use bytes::Bytes;
use folk_api::Executor;
use http::Response;
use http_body_util::BodyExt;
use tracing::debug;

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct GrpcEnvelope {
    pub service: String,
    pub method: String,
    pub payload: Vec<u8>,
}

#[derive(Clone)]
pub struct GrpcState {
    pub executor: Arc<dyn Executor>,
}

pub async fn grpc_handler(
    State(state): State<GrpcState>,
    req: axum::extract::Request,
) -> impl IntoResponse {
    let path = req.uri().path().to_string();

    let parts: Vec<&str> = path.trim_start_matches('/').splitn(2, '/').collect();
    let (service, method) = match parts.as_slice() {
        [s, m] => (s.to_string(), m.to_string()),
        _ => return grpc_error(12, "unimplemented: bad path"),
    };

    let body_bytes = match req.into_body().collect().await {
        Ok(collected) => collected.to_bytes(),
        Err(e) => return grpc_error(13, &format!("body: {e}")),
    };

    // Strip 5-byte gRPC framing (compression flag + 4-byte length)
    let proto_bytes = if body_bytes.len() >= 5 {
        body_bytes.slice(5..)
    } else {
        body_bytes
    };

    debug!(
        service,
        method,
        payload_len = proto_bytes.len(),
        "gRPC call"
    );

    let envelope = GrpcEnvelope {
        service,
        method,
        payload: proto_bytes.to_vec(),
    };
    let encoded = match rmp_serde::to_vec_named(&envelope) {
        Ok(v) => Bytes::from(v),
        Err(e) => return grpc_error(13, &format!("encode: {e}")),
    };

    let response_bytes = match state.executor.execute(encoded).await {
        Ok(v) => v,
        Err(e) => return grpc_error(13, &format!("worker: {e}")),
    };

    // Re-add 5-byte gRPC framing to response
    let mut framed = Vec::with_capacity(5 + response_bytes.len());
    framed.push(0u8); // no compression
    framed.extend_from_slice(&(response_bytes.len() as u32).to_be_bytes());
    framed.extend_from_slice(&response_bytes);

    Response::builder()
        .status(200)
        .header("content-type", "application/grpc")
        .header("grpc-status", "0")
        .body(Body::from(framed))
        .unwrap()
}

fn grpc_error(code: u32, msg: &str) -> Response<Body> {
    Response::builder()
        .status(200)
        .header("content-type", "application/grpc")
        .header("grpc-status", code.to_string())
        .header("grpc-message", msg)
        .body(Body::empty())
        .unwrap()
}