acton_htmx/agents/
request_reply.rs

1//! Request-reply pattern helpers for web handler agents
2//!
3//! This module provides utilities for implementing the request-reply pattern
4//! in acton-reactive agents that need to respond synchronously to web handler
5//! requests. This pattern is commonly used when Axum extractors need to wait
6//! for agent responses.
7//!
8//! # Why This Pattern?
9//!
10//! Axum extractors run synchronously (from the handler's perspective) but need
11//! to communicate with asynchronous agents. The oneshot channel provides a way
12//! to:
13//! 1. Send a request to an agent
14//! 2. Await the agent's response
15//! 3. Return the result to the extractor
16//!
17//! The `Arc<Mutex<Option<...>>>` wrapping is required because:
18//! - `Arc`: Messages must be cloneable for agent supervision/retry
19//! - `Mutex`: Interior mutability to take the sender when responding
20//! - `Option`: Allows taking ownership of the sender exactly once
21//!
22//! # Example Usage
23//!
24//! ```rust
25//! use acton_reactive::prelude::*;
26//! use crate::agents::request_reply::{create_request_reply, send_response, ResponseChannel};
27//! use crate::auth::session::SessionId;
28//!
29//! #[derive(Clone, Debug)]
30//! pub struct GetSessionRequest {
31//!     pub session_id: SessionId,
32//!     pub response_tx: ResponseChannel<Option<SessionData>>,
33//! }
34//!
35//! impl GetSessionRequest {
36//!     pub fn new(session_id: SessionId) -> (Self, oneshot::Receiver<Option<SessionData>>) {
37//!         let (response_tx, rx) = create_request_reply();
38//!         let request = Self { session_id, response_tx };
39//!         (request, rx)
40//!     }
41//! }
42//!
43//! // In the agent's message handler:
44//! async fn handle_get_session(request: GetSessionRequest) {
45//!     let session = load_session(&request.session_id);
46//!     let _ = send_response(request.response_tx, session).await;
47//! }
48//! ```
49
50use std::sync::Arc;
51use tokio::sync::{oneshot, Mutex};
52
53/// Standard response channel type for web handler requests
54///
55/// This type wraps a oneshot sender in `Arc<Mutex<Option<...>>>` to satisfy
56/// the requirements of acton-reactive message passing:
57/// - `Arc`: Messages must be `Clone` for agent supervision/retry
58/// - `Mutex`: Provides interior mutability to take ownership of the sender
59/// - `Option`: Allows taking the sender exactly once when responding
60///
61/// # Type Parameter
62///
63/// * `T` - The type of value to send through the channel
64pub type ResponseChannel<T> = Arc<Mutex<Option<oneshot::Sender<T>>>>;
65
66/// Create a request-reply pair with proper channel wrapping
67///
68/// This is a convenience function that creates both sides of a request-reply
69/// communication pattern. The sender is wrapped in `Arc<Mutex<Option<...>>>`
70/// for use in agent messages, while the receiver is returned directly for
71/// awaiting the response.
72///
73/// # Returns
74///
75/// A tuple of `(ResponseChannel<T>, oneshot::Receiver<T>)` where:
76/// - The `ResponseChannel` should be included in your request message
77/// - The `Receiver` should be awaited by the web handler
78///
79/// # Example
80///
81/// ```rust
82/// use crate::agents::request_reply::create_request_reply;
83///
84/// let (response_tx, rx) = create_request_reply();
85/// let request = MyRequest {
86///     data: "foo".to_string(),
87///     response_tx,
88/// };
89///
90/// // Send request to agent
91/// agent_handle.send(request).await;
92///
93/// // Wait for response
94/// let response = rx.await.expect("Agent dropped response channel");
95/// ```
96#[must_use]
97pub fn create_request_reply<T>() -> (ResponseChannel<T>, oneshot::Receiver<T>) {
98    let (tx, rx) = oneshot::channel();
99    (Arc::new(Mutex::new(Some(tx))), rx)
100}
101
102/// Send a response through a response channel
103///
104/// This function properly unwraps and uses the response channel to send a value
105/// back to the waiting web handler. The channel is consumed after sending.
106///
107/// # Parameters
108///
109/// * `response_tx` - The response channel from the request
110/// * `value` - The value to send back to the requester
111///
112/// # Returns
113///
114/// `Ok(())` if the response was successfully sent, or `Err(value)` if the
115/// receiver was dropped (which typically means the web handler timed out or
116/// the client disconnected).
117///
118/// # Errors
119///
120/// Returns `Err(value)` if:
121/// - The receiver was dropped (client disconnected or handler timed out)
122/// - The channel was already used
123///
124/// # Example
125///
126/// ```rust
127/// use crate::agents::request_reply::send_response;
128///
129/// async fn handle_request(request: MyRequest) {
130///     let result = perform_operation();
131///
132///     // Send response back to web handler
133///     if send_response(request.response_tx, result).await.is_err() {
134///         tracing::warn!("Client disconnected before receiving response");
135///     }
136/// }
137/// ```
138pub async fn send_response<T>(response_tx: ResponseChannel<T>, value: T) -> Result<(), T> {
139    // Take the sender from the Arc<Mutex<Option<...>>>
140    // Avoid holding the lock across the send operation
141    let tx = response_tx.lock().await.take();
142    if let Some(tx) = tx {
143        // Send the value through the oneshot channel
144        tx.send(value)
145    } else {
146        // Channel was already used or dropped
147        Err(value)
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[tokio::test]
156    async fn test_create_request_reply() {
157        let (response_tx, rx) = create_request_reply::<String>();
158
159        // Simulate sending response from agent
160        let result = send_response(response_tx, "test response".to_string()).await;
161        assert!(result.is_ok());
162
163        // Simulate receiving in web handler
164        let response = rx.await.expect("Should receive response");
165        assert_eq!(response, "test response");
166    }
167
168    #[tokio::test]
169    async fn test_send_response_with_dropped_receiver() {
170        let (response_tx, rx) = create_request_reply::<String>();
171
172        // Drop the receiver (simulating client disconnect)
173        drop(rx);
174
175        // Try to send - should fail
176        let result = send_response(response_tx, "test response".to_string()).await;
177        assert!(result.is_err());
178        assert_eq!(result.unwrap_err(), "test response");
179    }
180
181    #[tokio::test]
182    async fn test_send_response_twice_fails() {
183        let (response_tx, rx) = create_request_reply::<String>();
184
185        // First send should succeed
186        let response_tx_clone = response_tx.clone();
187        let result1 = send_response(response_tx, "first".to_string()).await;
188        assert!(result1.is_ok());
189
190        // Second send with cloned channel should fail (sender already taken)
191        let result2 = send_response(response_tx_clone, "second".to_string()).await;
192        assert!(result2.is_err());
193
194        // Receiver should get the first value
195        let response = rx.await.expect("Should receive response");
196        assert_eq!(response, "first");
197    }
198}