mecha10_core/rpc.rs
1//! Request/Response (RPC) Pattern
2//!
3//! Provides request/response helpers on top of the pub/sub messaging system.
4//! Enables command-response interactions where a node sends a request and waits
5//! for a response with a timeout.
6//!
7//! # Architecture
8//!
9//! - Uses correlation IDs to match requests with responses
10//! - Request and response share the same base topic
11//! - Request topic: `{base_topic}/request`
12//! - Response topic: `{base_topic}/response`
13//! - Each request gets a unique UUID correlation ID
14//! - Responder includes the correlation ID in the response
15//! - Requester filters responses by correlation ID
16//! - Errors from handlers are automatically sent as error responses
17//! - Requesters receive errors as `Mecha10Error::MessagingError`
18//!
19//! # Example
20//!
21//! ```rust
22//! use mecha10::prelude::*;
23//! use mecha10::rpc::RpcExt;
24//! use serde::{Serialize, Deserialize};
25//!
26//! #[derive(Debug, Serialize, Deserialize, Clone)]
27//! struct Command {
28//! action: String,
29//! params: Vec<f32>,
30//! }
31//!
32//! #[derive(Debug, Serialize, Deserialize, Clone)]
33//! struct CommandResult {
34//! success: bool,
35//! message: String,
36//! }
37//!
38//! // Request side
39//! # async fn request_example(ctx: &Context) -> Result<()> {
40//! let cmd = Command {
41//! action: "move".to_string(),
42//! params: vec![1.0, 2.0],
43//! };
44//!
45//! let result: CommandResult = ctx
46//! .request("/motor/control", &cmd)
47//! .timeout(Duration::from_secs(5))
48//! .await?;
49//!
50//! println!("Result: {} - {}", result.success, result.message);
51//! # Ok(())
52//! # }
53//!
54//! // Response side
55//! # async fn response_example(ctx: &Context) -> Result<()> {
56//! ctx.respond("/motor/control", |cmd: Command| async move {
57//! // Process command
58//! execute_command(&cmd).await?;
59//!
60//! Ok(CommandResult {
61//! success: true,
62//! message: "Command executed".to_string(),
63//! })
64//! }).await?;
65//! # Ok(())
66//! # }
67//! ```
68
69use crate::context::{Context, Receiver};
70use crate::error::{Mecha10Error, Result};
71use crate::messages::Message;
72use serde::{de::DeserializeOwned, Deserialize, Serialize};
73use std::future::Future;
74use std::time::Duration;
75use uuid::Uuid;
76
77/// Request envelope with correlation ID
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct Request<T> {
80 /// Unique correlation ID for matching request/response
81 pub correlation_id: String,
82
83 /// Request payload
84 pub payload: T,
85}
86
87impl<T: Message> Message for Request<T> {}
88
89/// Response envelope with correlation ID
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct Response<T> {
92 /// Correlation ID from the original request
93 pub correlation_id: String,
94
95 /// Response payload (success or error)
96 pub payload: RpcResult<T>,
97}
98
99impl<T: Message> Message for Response<T> {}
100
101/// RPC result that can represent success or error
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum RpcResult<T> {
104 /// Successful response
105 Ok(T),
106 /// Error response
107 Err(RpcError),
108}
109
110/// RPC error details
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct RpcError {
113 /// Error message
114 pub message: String,
115 /// Error code (optional)
116 pub code: Option<String>,
117}
118
119/// Request builder for configuring timeouts
120pub struct RequestBuilder<'a, Req, Resp> {
121 ctx: &'a Context,
122 topic: String,
123 request: Request<Req>,
124 timeout: Duration,
125 _phantom: std::marker::PhantomData<Resp>,
126}
127
128impl<'a, Req, Resp> RequestBuilder<'a, Req, Resp>
129where
130 Req: Message + Serialize + Clone,
131 Resp: Message + DeserializeOwned + Send + 'static,
132{
133 /// Set the timeout for the request
134 pub fn timeout(mut self, duration: Duration) -> Self {
135 self.timeout = duration;
136 self
137 }
138
139 /// Execute the request and wait for response
140 pub async fn execute(self) -> Result<Resp> {
141 let request_topic = format!("{}/request", self.topic);
142 let response_topic = format!("{}/response", self.topic);
143 let correlation_id = self.request.correlation_id.clone();
144
145 // Subscribe to response topic BEFORE publishing request (to avoid race condition)
146 let mut responses = self.ctx.subscribe_raw::<Response<Resp>>(&response_topic).await?;
147
148 // Publish the request
149 self.ctx.publish_raw(&request_topic, &self.request).await?;
150
151 // Wait for response with timeout
152 let deadline = tokio::time::Instant::now() + self.timeout;
153
154 loop {
155 match tokio::time::timeout_at(deadline, responses.recv()).await {
156 Ok(Some(response)) => {
157 // Check if correlation ID matches
158 if response.correlation_id == correlation_id {
159 // Unwrap RpcResult
160 match response.payload {
161 RpcResult::Ok(payload) => return Ok(payload),
162 RpcResult::Err(err) => {
163 return Err(Mecha10Error::MessagingError {
164 message: format!("RPC error: {}", err.message),
165 suggestion: "Check the handler implementation for the error cause".to_string(),
166 });
167 }
168 }
169 }
170 // Wrong correlation ID, keep waiting
171 continue;
172 }
173 Ok(None) => {
174 return Err(Mecha10Error::MessagingError {
175 message: "Response channel closed".to_string(),
176 suggestion: "Check if responder node is running".to_string(),
177 });
178 }
179 Err(_) => {
180 return Err(Mecha10Error::MessagingError {
181 message: format!("Request to {} timed out after {:?}", self.topic, self.timeout),
182 suggestion: "Check if responder node is running and responsive".to_string(),
183 });
184 }
185 }
186 }
187 }
188}
189
190/// RPC extension trait for Context
191pub trait RpcExt {
192 /// Send a request and wait for response
193 ///
194 /// # Arguments
195 ///
196 /// * `topic` - Base topic for the RPC (will use `{topic}/request` and `{topic}/response`)
197 /// * `request` - Request payload
198 ///
199 /// # Returns
200 ///
201 /// A `RequestBuilder` that can be configured with `.timeout()` and executed with `.await`
202 ///
203 /// # Example
204 ///
205 /// ```rust
206 /// use mecha10::prelude::*;
207 /// use mecha10::rpc::RpcExt;
208 ///
209 /// # async fn example(ctx: &Context) -> Result<()> {
210 /// let result: Response = ctx
211 /// .request("/motor/control", &command)
212 /// .timeout(Duration::from_secs(5))
213 /// .await?;
214 /// # Ok(())
215 /// # }
216 /// ```
217 fn request<Req, Resp>(&self, topic: &str, request: &Req) -> RequestBuilder<'_, Req, Resp>
218 where
219 Req: Message + Serialize + Clone,
220 Resp: Message + DeserializeOwned + Send + 'static;
221
222 /// Respond to requests on a topic
223 ///
224 /// # Arguments
225 ///
226 /// * `topic` - Base topic for the RPC
227 /// * `handler` - Async function that processes requests and returns responses
228 ///
229 /// # Example
230 ///
231 /// ```rust
232 /// use mecha10::prelude::*;
233 /// use mecha10::rpc::RpcExt;
234 ///
235 /// # async fn example(ctx: &Context) -> Result<()> {
236 /// ctx.respond("/motor/control", |cmd: Command| async move {
237 /// execute_command(&cmd).await?;
238 /// Ok(CommandResult { success: true })
239 /// }).await?;
240 /// # Ok(())
241 /// # }
242 /// ```
243 fn respond<Req, Resp, F, Fut>(&self, topic: &str, handler: F) -> impl Future<Output = Result<()>>
244 where
245 Req: Message + DeserializeOwned + Send + 'static,
246 Resp: Message + Serialize + Send + 'static,
247 F: Fn(Req) -> Fut + Send + 'static,
248 Fut: Future<Output = Result<Resp>> + Send;
249
250 /// Low-level: Subscribe to a topic without type-safe wrapper
251 fn subscribe_raw<T>(&self, topic: &str) -> impl Future<Output = Result<Receiver<T>>>
252 where
253 T: Message + DeserializeOwned + Send + 'static;
254
255 /// Low-level: Publish to a topic without type-safe wrapper
256 fn publish_raw<T>(&self, topic: &str, message: &T) -> impl Future<Output = Result<()>>
257 where
258 T: Message + Serialize;
259}
260
261impl RpcExt for Context {
262 fn request<Req, Resp>(&self, topic: &str, request: &Req) -> RequestBuilder<'_, Req, Resp>
263 where
264 Req: Message + Serialize + Clone,
265 Resp: Message + DeserializeOwned + Send + 'static,
266 {
267 RequestBuilder {
268 ctx: self,
269 topic: topic.to_string(),
270 request: Request {
271 correlation_id: Uuid::new_v4().to_string(),
272 payload: request.clone(),
273 },
274 timeout: Duration::from_secs(10), // Default 10 second timeout
275 _phantom: std::marker::PhantomData,
276 }
277 }
278
279 async fn respond<Req, Resp, F, Fut>(&self, topic: &str, handler: F) -> Result<()>
280 where
281 Req: Message + DeserializeOwned + Send + 'static,
282 Resp: Message + Serialize + Send + 'static,
283 F: Fn(Req) -> Fut + Send + 'static,
284 Fut: Future<Output = Result<Resp>> + Send,
285 {
286 let request_topic = format!("{}/request", topic);
287 let response_topic = format!("{}/response", topic);
288
289 // Subscribe to requests
290 let mut requests = self.subscribe_raw::<Request<Req>>(&request_topic).await?;
291
292 // Spawn task to handle requests
293 let ctx = self.clone();
294 tokio::spawn(async move {
295 while let Some(request) = requests.recv().await {
296 let correlation_id = request.correlation_id.clone();
297
298 // Call handler and wrap result
299 let result = match handler(request.payload).await {
300 Ok(response_payload) => RpcResult::Ok(response_payload),
301 Err(e) => {
302 tracing::error!(
303 correlation_id = %correlation_id,
304 error = %e,
305 "RPC handler failed"
306 );
307 RpcResult::Err(RpcError {
308 message: e.to_string(),
309 code: None,
310 })
311 }
312 };
313
314 let response = Response {
315 correlation_id: correlation_id.clone(),
316 payload: result,
317 };
318
319 // Send response (either success or error)
320 if let Err(e) = ctx.publish_raw(&response_topic, &response).await {
321 tracing::error!(
322 correlation_id = %correlation_id,
323 topic = %response_topic,
324 error = %e,
325 "Failed to publish RPC response"
326 );
327 }
328 }
329 });
330
331 Ok(())
332 }
333
334 async fn subscribe_raw<T>(&self, topic: &str) -> Result<Receiver<T>>
335 where
336 T: Message + DeserializeOwned + Send + 'static,
337 {
338 Context::subscribe_raw(self, topic).await
339 }
340
341 async fn publish_raw<T>(&self, topic: &str, message: &T) -> Result<()>
342 where
343 T: Message + Serialize,
344 {
345 Context::publish_raw(self, topic, message).await
346 }
347}