folk_plugin_grpc/
service.rs1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use axum::extract::State;
6use axum::response::IntoResponse;
7use bytes::Bytes;
8use folk_api::Executor;
9use http::{HeaderMap, HeaderValue, Response};
10use http_body::Frame;
11use http_body_util::BodyExt;
12use tracing::debug;
13
14#[derive(serde::Serialize, serde::Deserialize, Debug)]
15pub struct GrpcEnvelope {
16 pub service: String,
17 pub method: String,
18 pub payload: Vec<u8>,
19 pub metadata: std::collections::HashMap<String, String>,
20}
21
22#[derive(Clone)]
23pub struct GrpcState {
24 pub executor: Arc<dyn Executor>,
25}
26
27pub async fn grpc_handler(
28 State(state): State<GrpcState>,
29 req: axum::extract::Request,
30) -> impl IntoResponse {
31 let path = req.uri().path().to_string();
32
33 let parts: Vec<&str> = path.trim_start_matches('/').splitn(2, '/').collect();
34 let (service, method) = match parts.as_slice() {
35 [s, m] => (s.to_string(), m.to_string()),
36 _ => return grpc_response(Bytes::new(), 12, "unimplemented: bad path"),
37 };
38
39 let metadata: std::collections::HashMap<String, String> = req
40 .headers()
41 .iter()
42 .filter(|(k, _)| {
43 let k = k.as_str();
44 !k.starts_with(':')
45 && k != "content-type"
46 && k != "te"
47 && k != "user-agent"
48 && k != "grpc-timeout"
49 && k != "grpc-encoding"
50 && k != "grpc-accept-encoding"
51 })
52 .filter_map(|(k, v)| {
53 v.to_str()
54 .ok()
55 .map(|v| (k.as_str().to_string(), v.to_string()))
56 })
57 .collect();
58
59 let body_bytes = match req.into_body().collect().await {
60 Ok(collected) => collected.to_bytes(),
61 Err(e) => return grpc_response(Bytes::new(), 13, &format!("body: {e}")),
62 };
63
64 let proto_bytes = if body_bytes.len() >= 5 {
65 body_bytes.slice(5..)
66 } else {
67 body_bytes
68 };
69
70 debug!(service, method, payload_len = proto_bytes.len(), "gRPC call");
71
72 let envelope = GrpcEnvelope {
73 service,
74 method,
75 payload: proto_bytes.to_vec(),
76 metadata,
77 };
78 let encoded = match rmp_serde::to_vec_named(&envelope) {
79 Ok(v) => Bytes::from(v),
80 Err(e) => return grpc_response(Bytes::new(), 13, &format!("encode: {e}")),
81 };
82
83 let raw_response = match state.executor.execute_method("grpc.call", encoded).await {
84 Ok(v) => v,
85 Err(e) => return grpc_response(Bytes::new(), 13, &format!("worker: {e}")),
86 };
87
88 let proto_response = match rmp_serde::from_slice::<rmpv::Value>(&raw_response) {
89 Ok(rmpv::Value::Binary(b)) => b,
90 Ok(rmpv::Value::String(s)) => s.into_bytes(),
91 Ok(other) => {
92 return grpc_response(
93 Bytes::new(),
94 13,
95 &format!("unexpected response type: {other:?}"),
96 );
97 }
98 Err(_) => raw_response.to_vec(),
99 };
100
101 let mut framed = Vec::with_capacity(5 + proto_response.len());
103 framed.push(0u8);
104 framed.extend_from_slice(&(proto_response.len() as u32).to_be_bytes());
105 framed.extend_from_slice(&proto_response);
106
107 grpc_response(Bytes::from(framed), 0, "")
108}
109
110fn grpc_response(data: Bytes, status: u32, message: &str) -> Response<GrpcBody> {
115 let mut trailers = HeaderMap::new();
116 trailers.insert("grpc-status", HeaderValue::from(status));
117 if !message.is_empty() {
118 if let Ok(v) = HeaderValue::from_str(message) {
119 trailers.insert("grpc-message", v);
120 }
121 }
122
123 Response::builder()
124 .status(200)
125 .header("content-type", "application/grpc")
126 .body(GrpcBody {
127 data: Some(data),
128 trailers: Some(trailers),
129 })
130 .unwrap()
131}
132
133pub struct GrpcBody {
135 data: Option<Bytes>,
136 trailers: Option<HeaderMap>,
137}
138
139impl http_body::Body for GrpcBody {
140 type Data = Bytes;
141 type Error = std::convert::Infallible;
142
143 fn poll_frame(
144 mut self: Pin<&mut Self>,
145 _cx: &mut Context<'_>,
146 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
147 if let Some(data) = self.data.take() {
148 if !data.is_empty() {
149 return Poll::Ready(Some(Ok(Frame::data(data))));
150 }
151 }
152 if let Some(trailers) = self.trailers.take() {
153 return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
154 }
155 Poll::Ready(None)
156 }
157}