acton_htmx/agents/request_reply.rs
1//! Request-reply pattern helpers for web handler agents
2//!
3//! This module provides utilities for implementing the request-reply pattern
4//! in acton-reactive agents that need to respond synchronously to web handler
5//! requests. This pattern is commonly used when Axum extractors need to wait
6//! for agent responses.
7//!
8//! # Why This Pattern?
9//!
10//! Axum extractors run synchronously (from the handler's perspective) but need
11//! to communicate with asynchronous agents. The oneshot channel provides a way
12//! to:
13//! 1. Send a request to an agent
14//! 2. Await the agent's response
15//! 3. Return the result to the extractor
16//!
17//! The `Arc<Mutex<Option<...>>>` wrapping is required because:
18//! - `Arc`: Messages must be cloneable for agent supervision/retry
19//! - `Mutex`: Interior mutability to take the sender when responding
20//! - `Option`: Allows taking ownership of the sender exactly once
21//!
22//! # Example Usage
23//!
24//! ```rust
25//! use acton_reactive::prelude::*;
26//! use crate::agents::request_reply::{create_request_reply, send_response, ResponseChannel};
27//! use crate::auth::session::SessionId;
28//!
29//! #[derive(Clone, Debug)]
30//! pub struct GetSessionRequest {
31//! pub session_id: SessionId,
32//! pub response_tx: ResponseChannel<Option<SessionData>>,
33//! }
34//!
35//! impl GetSessionRequest {
36//! pub fn new(session_id: SessionId) -> (Self, oneshot::Receiver<Option<SessionData>>) {
37//! let (response_tx, rx) = create_request_reply();
38//! let request = Self { session_id, response_tx };
39//! (request, rx)
40//! }
41//! }
42//!
43//! // In the agent's message handler:
44//! async fn handle_get_session(request: GetSessionRequest) {
45//! let session = load_session(&request.session_id);
46//! let _ = send_response(request.response_tx, session).await;
47//! }
48//! ```
49
50use std::sync::Arc;
51use tokio::sync::{oneshot, Mutex};
52
53/// Standard response channel type for web handler requests
54///
55/// This type wraps a oneshot sender in `Arc<Mutex<Option<...>>>` to satisfy
56/// the requirements of acton-reactive message passing:
57/// - `Arc`: Messages must be `Clone` for agent supervision/retry
58/// - `Mutex`: Provides interior mutability to take ownership of the sender
59/// - `Option`: Allows taking the sender exactly once when responding
60///
61/// # Type Parameter
62///
63/// * `T` - The type of value to send through the channel
64pub type ResponseChannel<T> = Arc<Mutex<Option<oneshot::Sender<T>>>>;
65
66/// Create a request-reply pair with proper channel wrapping
67///
68/// This is a convenience function that creates both sides of a request-reply
69/// communication pattern. The sender is wrapped in `Arc<Mutex<Option<...>>>`
70/// for use in agent messages, while the receiver is returned directly for
71/// awaiting the response.
72///
73/// # Returns
74///
75/// A tuple of `(ResponseChannel<T>, oneshot::Receiver<T>)` where:
76/// - The `ResponseChannel` should be included in your request message
77/// - The `Receiver` should be awaited by the web handler
78///
79/// # Example
80///
81/// ```rust
82/// use crate::agents::request_reply::create_request_reply;
83///
84/// let (response_tx, rx) = create_request_reply();
85/// let request = MyRequest {
86/// data: "foo".to_string(),
87/// response_tx,
88/// };
89///
90/// // Send request to agent
91/// agent_handle.send(request).await;
92///
93/// // Wait for response
94/// let response = rx.await.expect("Agent dropped response channel");
95/// ```
96#[must_use]
97pub fn create_request_reply<T>() -> (ResponseChannel<T>, oneshot::Receiver<T>) {
98 let (tx, rx) = oneshot::channel();
99 (Arc::new(Mutex::new(Some(tx))), rx)
100}
101
102/// Send a response through a response channel
103///
104/// This function properly unwraps and uses the response channel to send a value
105/// back to the waiting web handler. The channel is consumed after sending.
106///
107/// # Parameters
108///
109/// * `response_tx` - The response channel from the request
110/// * `value` - The value to send back to the requester
111///
112/// # Returns
113///
114/// `Ok(())` if the response was successfully sent, or `Err(value)` if the
115/// receiver was dropped (which typically means the web handler timed out or
116/// the client disconnected).
117///
118/// # Errors
119///
120/// Returns `Err(value)` if:
121/// - The receiver was dropped (client disconnected or handler timed out)
122/// - The channel was already used
123///
124/// # Example
125///
126/// ```rust
127/// use crate::agents::request_reply::send_response;
128///
129/// async fn handle_request(request: MyRequest) {
130/// let result = perform_operation();
131///
132/// // Send response back to web handler
133/// if send_response(request.response_tx, result).await.is_err() {
134/// tracing::warn!("Client disconnected before receiving response");
135/// }
136/// }
137/// ```
138pub async fn send_response<T>(response_tx: ResponseChannel<T>, value: T) -> Result<(), T> {
139 // Take the sender from the Arc<Mutex<Option<...>>>
140 // Avoid holding the lock across the send operation
141 let tx = response_tx.lock().await.take();
142 if let Some(tx) = tx {
143 // Send the value through the oneshot channel
144 tx.send(value)
145 } else {
146 // Channel was already used or dropped
147 Err(value)
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[tokio::test]
156 async fn test_create_request_reply() {
157 let (response_tx, rx) = create_request_reply::<String>();
158
159 // Simulate sending response from agent
160 let result = send_response(response_tx, "test response".to_string()).await;
161 assert!(result.is_ok());
162
163 // Simulate receiving in web handler
164 let response = rx.await.expect("Should receive response");
165 assert_eq!(response, "test response");
166 }
167
168 #[tokio::test]
169 async fn test_send_response_with_dropped_receiver() {
170 let (response_tx, rx) = create_request_reply::<String>();
171
172 // Drop the receiver (simulating client disconnect)
173 drop(rx);
174
175 // Try to send - should fail
176 let result = send_response(response_tx, "test response".to_string()).await;
177 assert!(result.is_err());
178 assert_eq!(result.unwrap_err(), "test response");
179 }
180
181 #[tokio::test]
182 async fn test_send_response_twice_fails() {
183 let (response_tx, rx) = create_request_reply::<String>();
184
185 // First send should succeed
186 let response_tx_clone = response_tx.clone();
187 let result1 = send_response(response_tx, "first".to_string()).await;
188 assert!(result1.is_ok());
189
190 // Second send with cloned channel should fail (sender already taken)
191 let result2 = send_response(response_tx_clone, "second".to_string()).await;
192 assert!(result2.is_err());
193
194 // Receiver should get the first value
195 let response = rx.await.expect("Should receive response");
196 assert_eq!(response, "first");
197 }
198}