use serde_json::Value;
use std::collections::HashMap;
use std::sync::{LazyLock, RwLock};
use tokio::sync::oneshot;
use super::types::BidirError;
type ResponseSender = oneshot::Sender<Value>;
static PENDING_RESPONSES: LazyLock<RwLock<HashMap<String, ResponseSender>>> =
LazyLock::new(|| RwLock::new(HashMap::new()));
pub fn register_pending_request(request_id: String, sender: ResponseSender) {
let mut registry = PENDING_RESPONSES.write().unwrap();
registry.insert(request_id, sender);
}
pub fn unregister_pending_request(request_id: &str) -> Option<ResponseSender> {
let mut registry = PENDING_RESPONSES.write().unwrap();
registry.remove(request_id)
}
pub fn handle_pending_response(request_id: &str, response_data: Value) -> Result<(), BidirError> {
let sender = {
let mut registry = PENDING_RESPONSES.write().unwrap();
registry.remove(request_id)
};
match sender {
Some(tx) => {
tx.send(response_data).map_err(|_| BidirError::ChannelClosed)
}
None => Err(BidirError::UnknownRequest),
}
}
pub fn is_request_pending(request_id: &str) -> bool {
let registry = PENDING_RESPONSES.read().unwrap();
registry.contains_key(request_id)
}
pub fn pending_count() -> usize {
let registry = PENDING_RESPONSES.read().unwrap();
registry.len()
}
#[cfg(test)]
#[allow(dead_code)]
pub fn clear_all() {
let mut registry = PENDING_RESPONSES.write().unwrap();
registry.clear();
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_register_and_handle() {
let (tx, rx) = oneshot::channel();
let request_id = format!("test-reg-handle-{}", uuid::Uuid::new_v4());
register_pending_request(request_id.clone(), tx);
assert!(is_request_pending(&request_id));
let response = serde_json::json!({"confirmed": true});
handle_pending_response(&request_id, response.clone()).unwrap();
let received = rx.await.unwrap();
assert_eq!(received, response);
assert!(!is_request_pending(&request_id));
}
#[tokio::test]
async fn test_unknown_request() {
let result = handle_pending_response(
&format!("nonexistent-{}", uuid::Uuid::new_v4()),
serde_json::json!({}),
);
assert!(matches!(result, Err(BidirError::UnknownRequest)));
}
#[tokio::test]
async fn test_unregister() {
let (tx, _rx) = oneshot::channel();
let request_id = format!("test-unreg-{}", uuid::Uuid::new_v4());
register_pending_request(request_id.clone(), tx);
assert!(is_request_pending(&request_id));
let removed = unregister_pending_request(&request_id);
assert!(removed.is_some());
assert!(!is_request_pending(&request_id));
}
#[tokio::test]
async fn test_channel_closed() {
let (tx, rx) = oneshot::channel();
let request_id = format!("test-closed-{}", uuid::Uuid::new_v4());
register_pending_request(request_id.clone(), tx);
drop(rx);
let result = handle_pending_response(&request_id, serde_json::json!({}));
assert!(matches!(result, Err(BidirError::ChannelClosed)));
}
}