Skip to main content

spikard_http/grpc/
handler.rs

1//! Core GrpcHandler trait for language-agnostic gRPC request handling
2//!
3//! This module defines the handler trait that language bindings implement
4//! to handle gRPC requests. Similar to the HttpHandler pattern but designed
5//! specifically for gRPC's protobuf-based message format.
6
7use bytes::Bytes;
8use futures_util::StreamExt;
9use std::future::Future;
10use std::pin::Pin;
11use tonic::metadata::MetadataMap;
12
13use super::streaming::MessageStream;
14
15/// RPC mode enum for declaring handler capabilities
16///
17/// Indicates which type of RPC this handler supports. This is used at
18/// handler registration to route requests to the appropriate handler method.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum RpcMode {
21    /// Unary RPC: single request, single response
22    Unary,
23    /// Server streaming RPC: single request, stream of responses
24    ServerStreaming,
25    /// Client streaming RPC: stream of requests, single response
26    ClientStreaming,
27    /// Bidirectional streaming RPC: stream of requests, stream of responses
28    BidirectionalStreaming,
29}
30
31/// gRPC request data passed to handlers
32///
33/// Contains the parsed components of a gRPC request:
34/// - Service and method names from the request path
35/// - Serialized protobuf payload as bytes
36/// - Request metadata (headers)
37#[derive(Debug, Clone)]
38pub struct GrpcRequestData {
39    /// Fully qualified service name (e.g., "mypackage.MyService")
40    pub service_name: String,
41    /// Method name (e.g., "GetUser")
42    pub method_name: String,
43    /// Serialized protobuf message bytes
44    pub payload: Bytes,
45    /// gRPC metadata (similar to HTTP headers)
46    pub metadata: MetadataMap,
47}
48
49/// gRPC response data returned by handlers
50///
51/// Contains the serialized protobuf response and any metadata to include
52/// in the response headers.
53#[derive(Debug, Clone)]
54pub struct GrpcResponseData {
55    /// Serialized protobuf message bytes
56    pub payload: Bytes,
57    /// gRPC metadata to include in response (similar to HTTP headers)
58    pub metadata: MetadataMap,
59}
60
61/// Result type for gRPC handlers
62///
63/// Returns either:
64/// - Ok(GrpcResponseData): A successful response with payload and metadata
65/// - Err(tonic::Status): A gRPC error status with code and message
66pub type GrpcHandlerResult = Result<GrpcResponseData, tonic::Status>;
67
68/// Handler trait for gRPC requests
69///
70/// This is the language-agnostic interface that all gRPC handler implementations
71/// must satisfy. Language bindings (Python, TypeScript, Ruby, PHP) will implement
72/// this trait to bridge their runtime to Spikard's gRPC server.
73///
74/// Handlers declare their RPC mode (unary vs streaming) via the `rpc_mode()` method.
75/// The gRPC server uses this to route requests to either `call()` or `call_server_stream()`.
76///
77/// # Examples
78///
79/// ## Basic unary handler
80///
81/// ```ignore
82/// use spikard_http::grpc::{GrpcHandler, RpcMode, GrpcRequestData, GrpcResponseData, GrpcHandlerResult};
83/// use bytes::Bytes;
84/// use std::pin::Pin;
85/// use std::future::Future;
86///
87/// struct UnaryHandler;
88///
89/// impl GrpcHandler for UnaryHandler {
90///     fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
91///         Box::pin(async move {
92///             // Parse request.payload using protobuf deserialization
93///             let user_id = extract_id_from_payload(&request.payload);
94///
95///             // Process business logic
96///             let response_data = lookup_user(user_id).await?;
97///
98///             // Serialize response and return
99///             Ok(GrpcResponseData {
100///                 payload: serialize_user(&response_data),
101///                 metadata: tonic::metadata::MetadataMap::new(),
102///             })
103///         })
104///     }
105///
106///     fn service_name(&self) -> &str {
107///         "users.UserService"
108///     }
109///
110///     // Default rpc_mode() returns RpcMode::Unary
111/// }
112/// ```
113///
114/// ## Server streaming handler
115///
116/// ```ignore
117/// use spikard_http::grpc::{GrpcHandler, RpcMode, GrpcRequestData, MessageStream};
118/// use bytes::Bytes;
119/// use std::pin::Pin;
120/// use std::future::Future;
121///
122/// struct StreamingHandler;
123///
124/// impl GrpcHandler for StreamingHandler {
125///     fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send>> {
126///         // Unary call not used for streaming handlers, but must be implemented
127///         Box::pin(async {
128///             Err(tonic::Status::unimplemented("Use server streaming instead"))
129///         })
130///     }
131///
132///     fn service_name(&self) -> &str {
133///         "events.EventService"
134///     }
135///
136///     fn rpc_mode(&self) -> RpcMode {
137///         RpcMode::ServerStreaming
138///     }
139///
140///     fn call_server_stream(
141///         &self,
142///         request: GrpcRequestData,
143///     ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
144///         Box::pin(async move {
145///             // Parse request to extract stream criteria (e.g., user_id)
146///             let user_id = extract_id_from_payload(&request.payload);
147///
148///             // Generate messages (e.g., fetch events from database)
149///             let events = fetch_user_events(user_id).await?;
150///             let mut messages = Vec::new();
151///
152///             for event in events {
153///                 let serialized = serialize_event(&event);
154///                 messages.push(serialized);
155///             }
156///
157///             // Convert to stream and return
158///             Ok(Box::pin(futures_util::stream::iter(messages.into_iter().map(Ok))))
159///         })
160///     }
161/// }
162/// ```
163///
164/// # Dispatch Behavior
165///
166/// The gRPC server uses `rpc_mode()` to determine which handler method to call:
167///
168/// | RpcMode | Handler Method | Use Case |
169/// |---------|---|---|
170/// | `Unary` | `call()` | Single request, single response |
171/// | `ServerStreaming` | `call_server_stream()` | Single request, multiple responses |
172/// | `ClientStreaming` | `call_client_stream()` | Multiple requests, single response |
173/// | `BidirectionalStreaming` | `call_bidi_stream()` | Multiple requests, multiple responses |
174///
175/// # Error Handling
176///
177/// Both `call()` and `call_server_stream()` return gRPC error status values:
178///
179/// ```ignore
180/// // Return a specific gRPC error
181/// fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
182///     Box::pin(async {
183///         let Some(id) = parse_id(&request.payload) else {
184///             return Err(tonic::Status::invalid_argument("Missing user ID"));
185///         };
186///
187///         // ... process ...
188///     })
189/// }
190/// ```
191pub trait GrpcHandler: Send + Sync {
192    /// Handle a gRPC request
193    ///
194    /// Takes the parsed request data and returns a future that resolves to either:
195    /// - Ok(GrpcResponseData): A successful response
196    /// - Err(tonic::Status): An error with appropriate gRPC status code
197    ///
198    /// # Arguments
199    ///
200    /// * `request` - The parsed gRPC request containing service/method names,
201    ///   serialized payload, and metadata
202    ///
203    /// # Returns
204    ///
205    /// A future that resolves to a GrpcHandlerResult
206    fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send + '_>>;
207
208    /// Get the fully qualified service name this handler serves
209    ///
210    /// This is used for routing requests to the appropriate handler.
211    /// Should return the fully qualified service name as defined in the .proto file.
212    ///
213    /// # Example
214    ///
215    /// For a service defined as:
216    /// ```proto
217    /// package mypackage;
218    /// service UserService { ... }
219    /// ```
220    ///
221    /// This should return "mypackage.UserService"
222    fn service_name(&self) -> &str;
223
224    /// Get the RPC mode this handler supports
225    ///
226    /// Returns the type of RPC this handler implements. Used at handler registration
227    /// to route requests to the appropriate handler method.
228    ///
229    /// Default implementation returns `RpcMode::Unary` for backward compatibility.
230    fn rpc_mode(&self) -> RpcMode {
231        RpcMode::Unary
232    }
233
234    /// Handle a server streaming RPC request
235    ///
236    /// Takes a single request and returns a stream of response messages.
237    /// Default implementation adapts the unary `call()` response into a
238    /// single-message stream.
239    ///
240    /// # Arguments
241    ///
242    /// * `request` - The parsed gRPC request
243    ///
244    /// # Returns
245    ///
246    /// A future that resolves to either a stream of messages or an error status
247    fn call_server_stream(
248        &self,
249        request: GrpcRequestData,
250    ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send + '_>> {
251        let unary_future = self.call(request);
252        Box::pin(async move {
253            let response = unary_future.await?;
254            Ok(crate::grpc::streaming::single_message_stream(response.payload))
255        })
256    }
257
258    /// Handle a client streaming RPC call
259    ///
260    /// Takes a stream of request messages and returns a single response message.
261    /// Default implementation adapts to unary by requiring exactly one
262    /// request message in the stream.
263    fn call_client_stream(
264        &self,
265        request: crate::grpc::streaming::StreamingRequest,
266    ) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send + '_>> {
267        Box::pin(async move {
268            let crate::grpc::streaming::StreamingRequest {
269                service_name,
270                method_name,
271                mut message_stream,
272                metadata,
273            } = request;
274
275            let first_message = match message_stream.next().await {
276                Some(Ok(message)) => message,
277                Some(Err(status)) => return Err(status),
278                None => {
279                    return Err(tonic::Status::invalid_argument(
280                        "Client stream is empty; unary fallback requires exactly one request message",
281                    ));
282                }
283            };
284
285            if let Some(next_message) = message_stream.next().await {
286                match next_message {
287                    Ok(_) => {
288                        return Err(tonic::Status::invalid_argument(
289                            "Unary fallback requires exactly one request message",
290                        ));
291                    }
292                    Err(status) => return Err(status),
293                }
294            }
295
296            self.call(GrpcRequestData {
297                service_name,
298                method_name,
299                payload: first_message,
300                metadata,
301            })
302            .await
303        })
304    }
305
306    /// Handle a bidirectional streaming RPC call
307    ///
308    /// Takes a stream of request messages and returns a stream of response messages.
309    /// Default implementation adapts to unary by requiring exactly one
310    /// request message and returning a single-message response stream.
311    fn call_bidi_stream(
312        &self,
313        request: crate::grpc::streaming::StreamingRequest,
314    ) -> Pin<Box<dyn Future<Output = Result<crate::grpc::streaming::MessageStream, tonic::Status>> + Send + '_>> {
315        Box::pin(async move {
316            let response = self.call_client_stream(request).await?;
317            Ok(crate::grpc::streaming::single_message_stream(response.payload))
318        })
319    }
320}