spikard_http/grpc/handler.rs
1//! Core GrpcHandler trait for language-agnostic gRPC request handling
2//!
3//! This module defines the handler trait that language bindings implement
4//! to handle gRPC requests. Similar to the HttpHandler pattern but designed
5//! specifically for gRPC's protobuf-based message format.
6
7use bytes::Bytes;
8use futures_util::StreamExt;
9use std::future::Future;
10use std::pin::Pin;
11use tonic::metadata::MetadataMap;
12
13use super::streaming::MessageStream;
14
15/// RPC mode enum for declaring handler capabilities
16///
17/// Indicates which type of RPC this handler supports. This is used at
18/// handler registration to route requests to the appropriate handler method.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum RpcMode {
21 /// Unary RPC: single request, single response
22 Unary,
23 /// Server streaming RPC: single request, stream of responses
24 ServerStreaming,
25 /// Client streaming RPC: stream of requests, single response
26 ClientStreaming,
27 /// Bidirectional streaming RPC: stream of requests, stream of responses
28 BidirectionalStreaming,
29}
30
31/// gRPC request data passed to handlers
32///
33/// Contains the parsed components of a gRPC request:
34/// - Service and method names from the request path
35/// - Serialized protobuf payload as bytes
36/// - Request metadata (headers)
37#[derive(Debug, Clone)]
38pub struct GrpcRequestData {
39 /// Fully qualified service name (e.g., "mypackage.MyService")
40 pub service_name: String,
41 /// Method name (e.g., "GetUser")
42 pub method_name: String,
43 /// Serialized protobuf message bytes
44 pub payload: Bytes,
45 /// gRPC metadata (similar to HTTP headers)
46 pub metadata: MetadataMap,
47}
48
49/// gRPC response data returned by handlers
50///
51/// Contains the serialized protobuf response and any metadata to include
52/// in the response headers.
53#[derive(Debug, Clone)]
54pub struct GrpcResponseData {
55 /// Serialized protobuf message bytes
56 pub payload: Bytes,
57 /// gRPC metadata to include in response (similar to HTTP headers)
58 pub metadata: MetadataMap,
59}
60
61/// Result type for gRPC handlers
62///
63/// Returns either:
64/// - Ok(GrpcResponseData): A successful response with payload and metadata
65/// - Err(tonic::Status): A gRPC error status with code and message
66pub type GrpcHandlerResult = Result<GrpcResponseData, tonic::Status>;
67
68/// Handler trait for gRPC requests
69///
70/// This is the language-agnostic interface that all gRPC handler implementations
71/// must satisfy. Language bindings (Python, TypeScript, Ruby, PHP) will implement
72/// this trait to bridge their runtime to Spikard's gRPC server.
73///
74/// Handlers declare their RPC mode (unary vs streaming) via the `rpc_mode()` method.
75/// The gRPC server uses this to route requests to either `call()` or `call_server_stream()`.
76///
77/// # Examples
78///
79/// ## Basic unary handler
80///
81/// ```ignore
82/// use spikard_http::grpc::{GrpcHandler, RpcMode, GrpcRequestData, GrpcResponseData, GrpcHandlerResult};
83/// use bytes::Bytes;
84/// use std::pin::Pin;
85/// use std::future::Future;
86///
87/// struct UnaryHandler;
88///
89/// impl GrpcHandler for UnaryHandler {
90/// fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
91/// Box::pin(async move {
92/// // Parse request.payload using protobuf deserialization
93/// let user_id = extract_id_from_payload(&request.payload);
94///
95/// // Process business logic
96/// let response_data = lookup_user(user_id).await?;
97///
98/// // Serialize response and return
99/// Ok(GrpcResponseData {
100/// payload: serialize_user(&response_data),
101/// metadata: tonic::metadata::MetadataMap::new(),
102/// })
103/// })
104/// }
105///
106/// fn service_name(&self) -> &str {
107/// "users.UserService"
108/// }
109///
110/// // Default rpc_mode() returns RpcMode::Unary
111/// }
112/// ```
113///
114/// ## Server streaming handler
115///
116/// ```ignore
117/// use spikard_http::grpc::{GrpcHandler, RpcMode, GrpcRequestData, MessageStream};
118/// use bytes::Bytes;
119/// use std::pin::Pin;
120/// use std::future::Future;
121///
122/// struct StreamingHandler;
123///
124/// impl GrpcHandler for StreamingHandler {
125/// fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send>> {
126/// // Unary call not used for streaming handlers, but must be implemented
127/// Box::pin(async {
128/// Err(tonic::Status::unimplemented("Use server streaming instead"))
129/// })
130/// }
131///
132/// fn service_name(&self) -> &str {
133/// "events.EventService"
134/// }
135///
136/// fn rpc_mode(&self) -> RpcMode {
137/// RpcMode::ServerStreaming
138/// }
139///
140/// fn call_server_stream(
141/// &self,
142/// request: GrpcRequestData,
143/// ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
144/// Box::pin(async move {
145/// // Parse request to extract stream criteria (e.g., user_id)
146/// let user_id = extract_id_from_payload(&request.payload);
147///
148/// // Generate messages (e.g., fetch events from database)
149/// let events = fetch_user_events(user_id).await?;
150/// let mut messages = Vec::new();
151///
152/// for event in events {
153/// let serialized = serialize_event(&event);
154/// messages.push(serialized);
155/// }
156///
157/// // Convert to stream and return
158/// Ok(Box::pin(futures_util::stream::iter(messages.into_iter().map(Ok))))
159/// })
160/// }
161/// }
162/// ```
163///
164/// # Dispatch Behavior
165///
166/// The gRPC server uses `rpc_mode()` to determine which handler method to call:
167///
168/// | RpcMode | Handler Method | Use Case |
169/// |---------|---|---|
170/// | `Unary` | `call()` | Single request, single response |
171/// | `ServerStreaming` | `call_server_stream()` | Single request, multiple responses |
172/// | `ClientStreaming` | `call_client_stream()` | Multiple requests, single response |
173/// | `BidirectionalStreaming` | `call_bidi_stream()` | Multiple requests, multiple responses |
174///
175/// # Error Handling
176///
177/// Both `call()` and `call_server_stream()` return gRPC error status values:
178///
179/// ```ignore
180/// // Return a specific gRPC error
181/// fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
182/// Box::pin(async {
183/// let Some(id) = parse_id(&request.payload) else {
184/// return Err(tonic::Status::invalid_argument("Missing user ID"));
185/// };
186///
187/// // ... process ...
188/// })
189/// }
190/// ```
191pub trait GrpcHandler: Send + Sync {
192 /// Handle a gRPC request
193 ///
194 /// Takes the parsed request data and returns a future that resolves to either:
195 /// - Ok(GrpcResponseData): A successful response
196 /// - Err(tonic::Status): An error with appropriate gRPC status code
197 ///
198 /// # Arguments
199 ///
200 /// * `request` - The parsed gRPC request containing service/method names,
201 /// serialized payload, and metadata
202 ///
203 /// # Returns
204 ///
205 /// A future that resolves to a GrpcHandlerResult
206 fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send + '_>>;
207
208 /// Get the fully qualified service name this handler serves
209 ///
210 /// This is used for routing requests to the appropriate handler.
211 /// Should return the fully qualified service name as defined in the .proto file.
212 ///
213 /// # Example
214 ///
215 /// For a service defined as:
216 /// ```proto
217 /// package mypackage;
218 /// service UserService { ... }
219 /// ```
220 ///
221 /// This should return "mypackage.UserService"
222 fn service_name(&self) -> &str;
223
224 /// Get the RPC mode this handler supports
225 ///
226 /// Returns the type of RPC this handler implements. Used at handler registration
227 /// to route requests to the appropriate handler method.
228 ///
229 /// Default implementation returns `RpcMode::Unary` for backward compatibility.
230 fn rpc_mode(&self) -> RpcMode {
231 RpcMode::Unary
232 }
233
234 /// Handle a server streaming RPC request
235 ///
236 /// Takes a single request and returns a stream of response messages.
237 /// Default implementation adapts the unary `call()` response into a
238 /// single-message stream.
239 ///
240 /// # Arguments
241 ///
242 /// * `request` - The parsed gRPC request
243 ///
244 /// # Returns
245 ///
246 /// A future that resolves to either a stream of messages or an error status
247 fn call_server_stream(
248 &self,
249 request: GrpcRequestData,
250 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send + '_>> {
251 let unary_future = self.call(request);
252 Box::pin(async move {
253 let response = unary_future.await?;
254 Ok(crate::grpc::streaming::single_message_stream(response.payload))
255 })
256 }
257
258 /// Handle a client streaming RPC call
259 ///
260 /// Takes a stream of request messages and returns a single response message.
261 /// Default implementation adapts to unary by requiring exactly one
262 /// request message in the stream.
263 fn call_client_stream(
264 &self,
265 request: crate::grpc::streaming::StreamingRequest,
266 ) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send + '_>> {
267 Box::pin(async move {
268 let crate::grpc::streaming::StreamingRequest {
269 service_name,
270 method_name,
271 mut message_stream,
272 metadata,
273 } = request;
274
275 let first_message = match message_stream.next().await {
276 Some(Ok(message)) => message,
277 Some(Err(status)) => return Err(status),
278 None => {
279 return Err(tonic::Status::invalid_argument(
280 "Client stream is empty; unary fallback requires exactly one request message",
281 ));
282 }
283 };
284
285 if let Some(next_message) = message_stream.next().await {
286 match next_message {
287 Ok(_) => {
288 return Err(tonic::Status::invalid_argument(
289 "Unary fallback requires exactly one request message",
290 ));
291 }
292 Err(status) => return Err(status),
293 }
294 }
295
296 self.call(GrpcRequestData {
297 service_name,
298 method_name,
299 payload: first_message,
300 metadata,
301 })
302 .await
303 })
304 }
305
306 /// Handle a bidirectional streaming RPC call
307 ///
308 /// Takes a stream of request messages and returns a stream of response messages.
309 /// Default implementation adapts to unary by requiring exactly one
310 /// request message and returning a single-message response stream.
311 fn call_bidi_stream(
312 &self,
313 request: crate::grpc::streaming::StreamingRequest,
314 ) -> Pin<Box<dyn Future<Output = Result<crate::grpc::streaming::MessageStream, tonic::Status>> + Send + '_>> {
315 Box::pin(async move {
316 let response = self.call_client_stream(request).await?;
317 Ok(crate::grpc::streaming::single_message_stream(response.payload))
318 })
319 }
320}