mecha10_core/
rpc.rs

1//! Request/Response (RPC) Pattern
2//!
3//! Provides request/response helpers on top of the pub/sub messaging system.
4//! Enables command-response interactions where a node sends a request and waits
5//! for a response with a timeout.
6//!
7//! # Architecture
8//!
9//! - Uses correlation IDs to match requests with responses
10//! - Request and response share the same base topic
11//! - Request topic: `{base_topic}/request`
12//! - Response topic: `{base_topic}/response`
13//! - Each request gets a unique UUID correlation ID
14//! - Responder includes the correlation ID in the response
15//! - Requester filters responses by correlation ID
16//! - Errors from handlers are automatically sent as error responses
17//! - Requesters receive errors as `Mecha10Error::MessagingError`
18//!
19//! # Example
20//!
21//! ```rust
22//! use mecha10::prelude::*;
23//! use mecha10::rpc::RpcExt;
24//! use serde::{Serialize, Deserialize};
25//!
26//! #[derive(Debug, Serialize, Deserialize, Clone)]
27//! struct Command {
28//!     action: String,
29//!     params: Vec<f32>,
30//! }
31//!
32//! #[derive(Debug, Serialize, Deserialize, Clone)]
33//! struct CommandResult {
34//!     success: bool,
35//!     message: String,
36//! }
37//!
38//! // Request side
39//! # async fn request_example(ctx: &Context) -> Result<()> {
40//! let cmd = Command {
41//!     action: "move".to_string(),
42//!     params: vec![1.0, 2.0],
43//! };
44//!
45//! let result: CommandResult = ctx
46//!     .request("/motor/control", &cmd)
47//!     .timeout(Duration::from_secs(5))
48//!     .await?;
49//!
50//! println!("Result: {} - {}", result.success, result.message);
51//! # Ok(())
52//! # }
53//!
54//! // Response side
55//! # async fn response_example(ctx: &Context) -> Result<()> {
56//! ctx.respond("/motor/control", |cmd: Command| async move {
57//!     // Process command
58//!     execute_command(&cmd).await?;
59//!
60//!     Ok(CommandResult {
61//!         success: true,
62//!         message: "Command executed".to_string(),
63//!     })
64//! }).await?;
65//! # Ok(())
66//! # }
67//! ```
68
69use crate::context::{Context, Receiver};
70use crate::error::{Mecha10Error, Result};
71use crate::messages::Message;
72use serde::{de::DeserializeOwned, Deserialize, Serialize};
73use std::future::Future;
74use std::time::Duration;
75use uuid::Uuid;
76
77/// Request envelope with correlation ID
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct Request<T> {
80    /// Unique correlation ID for matching request/response
81    pub correlation_id: String,
82
83    /// Request payload
84    pub payload: T,
85}
86
87impl<T: Message> Message for Request<T> {}
88
89/// Response envelope with correlation ID
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct Response<T> {
92    /// Correlation ID from the original request
93    pub correlation_id: String,
94
95    /// Response payload (success or error)
96    pub payload: RpcResult<T>,
97}
98
99impl<T: Message> Message for Response<T> {}
100
101/// RPC result that can represent success or error
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum RpcResult<T> {
104    /// Successful response
105    Ok(T),
106    /// Error response
107    Err(RpcError),
108}
109
110/// RPC error details
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct RpcError {
113    /// Error message
114    pub message: String,
115    /// Error code (optional)
116    pub code: Option<String>,
117}
118
119/// Request builder for configuring timeouts
120pub struct RequestBuilder<'a, Req, Resp> {
121    ctx: &'a Context,
122    topic: String,
123    request: Request<Req>,
124    timeout: Duration,
125    _phantom: std::marker::PhantomData<Resp>,
126}
127
128impl<'a, Req, Resp> RequestBuilder<'a, Req, Resp>
129where
130    Req: Message + Serialize + Clone,
131    Resp: Message + DeserializeOwned + Send + 'static,
132{
133    /// Set the timeout for the request
134    pub fn timeout(mut self, duration: Duration) -> Self {
135        self.timeout = duration;
136        self
137    }
138
139    /// Execute the request and wait for response
140    pub async fn execute(self) -> Result<Resp> {
141        let request_topic = format!("{}/request", self.topic);
142        let response_topic = format!("{}/response", self.topic);
143        let correlation_id = self.request.correlation_id.clone();
144
145        // Subscribe to response topic BEFORE publishing request (to avoid race condition)
146        let mut responses = self.ctx.subscribe_raw::<Response<Resp>>(&response_topic).await?;
147
148        // Publish the request
149        self.ctx.publish_raw(&request_topic, &self.request).await?;
150
151        // Wait for response with timeout
152        let deadline = tokio::time::Instant::now() + self.timeout;
153
154        loop {
155            match tokio::time::timeout_at(deadline, responses.recv()).await {
156                Ok(Some(response)) => {
157                    // Check if correlation ID matches
158                    if response.correlation_id == correlation_id {
159                        // Unwrap RpcResult
160                        match response.payload {
161                            RpcResult::Ok(payload) => return Ok(payload),
162                            RpcResult::Err(err) => {
163                                return Err(Mecha10Error::MessagingError {
164                                    message: format!("RPC error: {}", err.message),
165                                    suggestion: "Check the handler implementation for the error cause".to_string(),
166                                });
167                            }
168                        }
169                    }
170                    // Wrong correlation ID, keep waiting
171                    continue;
172                }
173                Ok(None) => {
174                    return Err(Mecha10Error::MessagingError {
175                        message: "Response channel closed".to_string(),
176                        suggestion: "Check if responder node is running".to_string(),
177                    });
178                }
179                Err(_) => {
180                    return Err(Mecha10Error::MessagingError {
181                        message: format!("Request to {} timed out after {:?}", self.topic, self.timeout),
182                        suggestion: "Check if responder node is running and responsive".to_string(),
183                    });
184                }
185            }
186        }
187    }
188}
189
190/// RPC extension trait for Context
191pub trait RpcExt {
192    /// Send a request and wait for response
193    ///
194    /// # Arguments
195    ///
196    /// * `topic` - Base topic for the RPC (will use `{topic}/request` and `{topic}/response`)
197    /// * `request` - Request payload
198    ///
199    /// # Returns
200    ///
201    /// A `RequestBuilder` that can be configured with `.timeout()` and executed with `.await`
202    ///
203    /// # Example
204    ///
205    /// ```rust
206    /// use mecha10::prelude::*;
207    /// use mecha10::rpc::RpcExt;
208    ///
209    /// # async fn example(ctx: &Context) -> Result<()> {
210    /// let result: Response = ctx
211    ///     .request("/motor/control", &command)
212    ///     .timeout(Duration::from_secs(5))
213    ///     .await?;
214    /// # Ok(())
215    /// # }
216    /// ```
217    fn request<Req, Resp>(&self, topic: &str, request: &Req) -> RequestBuilder<'_, Req, Resp>
218    where
219        Req: Message + Serialize + Clone,
220        Resp: Message + DeserializeOwned + Send + 'static;
221
222    /// Respond to requests on a topic
223    ///
224    /// # Arguments
225    ///
226    /// * `topic` - Base topic for the RPC
227    /// * `handler` - Async function that processes requests and returns responses
228    ///
229    /// # Example
230    ///
231    /// ```rust
232    /// use mecha10::prelude::*;
233    /// use mecha10::rpc::RpcExt;
234    ///
235    /// # async fn example(ctx: &Context) -> Result<()> {
236    /// ctx.respond("/motor/control", |cmd: Command| async move {
237    ///     execute_command(&cmd).await?;
238    ///     Ok(CommandResult { success: true })
239    /// }).await?;
240    /// # Ok(())
241    /// # }
242    /// ```
243    fn respond<Req, Resp, F, Fut>(&self, topic: &str, handler: F) -> impl Future<Output = Result<()>>
244    where
245        Req: Message + DeserializeOwned + Send + 'static,
246        Resp: Message + Serialize + Send + 'static,
247        F: Fn(Req) -> Fut + Send + 'static,
248        Fut: Future<Output = Result<Resp>> + Send;
249
250    /// Low-level: Subscribe to a topic without type-safe wrapper
251    fn subscribe_raw<T>(&self, topic: &str) -> impl Future<Output = Result<Receiver<T>>>
252    where
253        T: Message + DeserializeOwned + Send + 'static;
254
255    /// Low-level: Publish to a topic without type-safe wrapper
256    fn publish_raw<T>(&self, topic: &str, message: &T) -> impl Future<Output = Result<()>>
257    where
258        T: Message + Serialize;
259}
260
261impl RpcExt for Context {
262    fn request<Req, Resp>(&self, topic: &str, request: &Req) -> RequestBuilder<'_, Req, Resp>
263    where
264        Req: Message + Serialize + Clone,
265        Resp: Message + DeserializeOwned + Send + 'static,
266    {
267        RequestBuilder {
268            ctx: self,
269            topic: topic.to_string(),
270            request: Request {
271                correlation_id: Uuid::new_v4().to_string(),
272                payload: request.clone(),
273            },
274            timeout: Duration::from_secs(10), // Default 10 second timeout
275            _phantom: std::marker::PhantomData,
276        }
277    }
278
279    async fn respond<Req, Resp, F, Fut>(&self, topic: &str, handler: F) -> Result<()>
280    where
281        Req: Message + DeserializeOwned + Send + 'static,
282        Resp: Message + Serialize + Send + 'static,
283        F: Fn(Req) -> Fut + Send + 'static,
284        Fut: Future<Output = Result<Resp>> + Send,
285    {
286        let request_topic = format!("{}/request", topic);
287        let response_topic = format!("{}/response", topic);
288
289        // Subscribe to requests
290        let mut requests = self.subscribe_raw::<Request<Req>>(&request_topic).await?;
291
292        // Spawn task to handle requests
293        let ctx = self.clone();
294        tokio::spawn(async move {
295            while let Some(request) = requests.recv().await {
296                let correlation_id = request.correlation_id.clone();
297
298                // Call handler and wrap result
299                let result = match handler(request.payload).await {
300                    Ok(response_payload) => RpcResult::Ok(response_payload),
301                    Err(e) => {
302                        tracing::error!(
303                            correlation_id = %correlation_id,
304                            error = %e,
305                            "RPC handler failed"
306                        );
307                        RpcResult::Err(RpcError {
308                            message: e.to_string(),
309                            code: None,
310                        })
311                    }
312                };
313
314                let response = Response {
315                    correlation_id: correlation_id.clone(),
316                    payload: result,
317                };
318
319                // Send response (either success or error)
320                if let Err(e) = ctx.publish_raw(&response_topic, &response).await {
321                    tracing::error!(
322                        correlation_id = %correlation_id,
323                        topic = %response_topic,
324                        error = %e,
325                        "Failed to publish RPC response"
326                    );
327                }
328            }
329        });
330
331        Ok(())
332    }
333
334    async fn subscribe_raw<T>(&self, topic: &str) -> Result<Receiver<T>>
335    where
336        T: Message + DeserializeOwned + Send + 'static,
337    {
338        Context::subscribe_raw(self, topic).await
339    }
340
341    async fn publish_raw<T>(&self, topic: &str, message: &T) -> Result<()>
342    where
343        T: Message + Serialize,
344    {
345        Context::publish_raw(self, topic, message).await
346    }
347}