folk_plugin_grpc/
service.rs1use std::sync::Arc;
2
3use axum::body::Body;
4use axum::extract::State;
5use axum::response::IntoResponse;
6use bytes::Bytes;
7use folk_api::Executor;
8use http::Response;
9use http_body_util::BodyExt;
10use tracing::debug;
11
12#[derive(serde::Serialize, serde::Deserialize, Debug)]
13pub struct GrpcEnvelope {
14 pub service: String,
15 pub method: String,
16 pub payload: Vec<u8>,
17 pub metadata: std::collections::HashMap<String, String>,
18}
19
20#[derive(Clone)]
21pub struct GrpcState {
22 pub executor: Arc<dyn Executor>,
23}
24
25pub async fn grpc_handler(
26 State(state): State<GrpcState>,
27 req: axum::extract::Request,
28) -> impl IntoResponse {
29 let path = req.uri().path().to_string();
30
31 let parts: Vec<&str> = path.trim_start_matches('/').splitn(2, '/').collect();
32 let (service, method) = match parts.as_slice() {
33 [s, m] => (s.to_string(), m.to_string()),
34 _ => return grpc_error(12, "unimplemented: bad path"),
35 };
36
37 let metadata: std::collections::HashMap<String, String> = req
40 .headers()
41 .iter()
42 .filter(|(k, _)| {
43 let k = k.as_str();
44 !k.starts_with(':')
45 && k != "content-type"
46 && k != "te"
47 && k != "user-agent"
48 && k != "grpc-timeout"
49 && k != "grpc-encoding"
50 && k != "grpc-accept-encoding"
51 })
52 .filter_map(|(k, v)| {
53 v.to_str().ok().map(|v| (k.as_str().to_string(), v.to_string()))
54 })
55 .collect();
56
57 let body_bytes = match req.into_body().collect().await {
58 Ok(collected) => collected.to_bytes(),
59 Err(e) => return grpc_error(13, &format!("body: {e}")),
60 };
61
62 let proto_bytes = if body_bytes.len() >= 5 {
64 body_bytes.slice(5..)
65 } else {
66 body_bytes
67 };
68
69 debug!(
70 service,
71 method,
72 payload_len = proto_bytes.len(),
73 "gRPC call"
74 );
75
76 let envelope = GrpcEnvelope {
77 service,
78 method,
79 payload: proto_bytes.to_vec(),
80 metadata,
81 };
82 let encoded = match rmp_serde::to_vec_named(&envelope) {
83 Ok(v) => Bytes::from(v),
84 Err(e) => return grpc_error(13, &format!("encode: {e}")),
85 };
86
87 let raw_response = match state.executor.execute_method("grpc.call", encoded).await {
88 Ok(v) => v,
89 Err(e) => return grpc_error(13, &format!("worker: {e}")),
90 };
91
92 let proto_response = match rmp_serde::from_slice::<rmpv::Value>(&raw_response) {
94 Ok(rmpv::Value::Binary(b)) => b,
95 Ok(rmpv::Value::String(s)) => s.into_bytes(),
96 Ok(other) => {
97 return grpc_error(13, &format!("unexpected response type: {other:?}"));
98 }
99 Err(_) => raw_response.to_vec(),
100 };
101
102 let mut framed = Vec::with_capacity(5 + proto_response.len());
104 framed.push(0u8); framed.extend_from_slice(&(proto_response.len() as u32).to_be_bytes());
106 framed.extend_from_slice(&proto_response);
107
108 Response::builder()
109 .status(200)
110 .header("content-type", "application/grpc")
111 .header("grpc-status", "0")
112 .body(Body::from(framed))
113 .unwrap()
114}
115
116fn grpc_error(code: u32, msg: &str) -> Response<Body> {
117 Response::builder()
118 .status(200)
119 .header("content-type", "application/grpc")
120 .header("grpc-status", code.to_string())
121 .header("grpc-message", msg)
122 .body(Body::empty())
123 .unwrap()
124}