use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use async_trait::async_trait;
use tokio::sync::{mpsc, RwLock};
use url::Url;
use log::{info, warn};
use reqwest::{Client as HttpClient, header};
use crate::error::Error;
use crate::protocol::JSONRPCMessage;
use super::Transport;
pub struct HttpTransport {
url: String,
connected: Arc<AtomicBool>,
client: HttpClient,
receiver: Arc<RwLock<Option<mpsc::Receiver<Result<JSONRPCMessage, Error>>>>>,
}
impl HttpTransport {
pub fn new(url: &str) -> Result<Self, Error> {
let url_parsed = Url::parse(url).map_err(Error::from)?;
if url_parsed.scheme() != "http" && url_parsed.scheme() != "https" {
return Err(Error::UrlError(format!(
"Invalid URL scheme: {}. Expected http or https",
url_parsed.scheme()
)));
}
let mut headers = header::HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"),
);
headers.insert(
header::ACCEPT,
header::HeaderValue::from_static("application/json"),
);
let client = HttpClient::builder()
.default_headers(headers)
.build()
.map_err(|e| Error::TransportError(format!("Failed to create HTTP client: {}", e)))?;
Ok(Self {
url: url.to_string(),
connected: Arc::new(AtomicBool::new(false)),
client,
receiver: Arc::new(RwLock::new(None)),
})
}
}
#[async_trait]
impl Transport for HttpTransport {
async fn connect(&self) -> Result<(), Error> {
if self.connected.load(Ordering::SeqCst) {
return Ok(());
}
let response = self.client
.head(&self.url)
.send()
.await
.map_err(|e| Error::TransportError(format!("HTTP connection failed: {}", e)))?;
if !response.status().is_success() {
return Err(Error::TransportError(format!(
"HTTP connection failed with status: {}",
response.status()
)));
}
info!("HTTP transport connected to {}", self.url);
let (_incoming_tx, incoming_rx) = mpsc::channel::<Result<JSONRPCMessage, Error>>(100);
{
let mut receiver = self.receiver.write().await;
*receiver = Some(incoming_rx);
}
self.connected.store(true, Ordering::SeqCst);
Ok(())
}
async fn disconnect(&self) -> Result<(), Error> {
if !self.connected.load(Ordering::SeqCst) {
return Ok(());
}
{
let mut receiver = self.receiver.write().await;
*receiver = None;
}
self.connected.store(false, Ordering::SeqCst);
info!("HTTP transport disconnected");
Ok(())
}
async fn send(&self, message: JSONRPCMessage) -> Result<(), Error> {
if !self.connected.load(Ordering::SeqCst) {
return Err(Error::ConnectionClosed("Not connected".to_string()));
}
let json = serde_json::to_string(&message)
.map_err(|e| Error::JsonError(e.to_string()))?;
let response = self.client
.post(&self.url)
.body(json)
.send()
.await
.map_err(|e| Error::TransportError(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() {
return Err(Error::TransportError(format!(
"HTTP request failed with status: {}",
response.status()
)));
}
let response_body = response
.text()
.await
.map_err(|e| Error::TransportError(format!("Failed to read HTTP response: {}", e)))?;
let response_message: JSONRPCMessage = serde_json::from_str(&response_body)
.map_err(|e| Error::JsonError(format!("Failed to parse response: {}", e)))?;
if let Some(_receiver) = &self.receiver.read().await.as_ref() {
let (incoming_tx, _) = mpsc::channel::<Result<JSONRPCMessage, Error>>(1);
if let Err(_) = incoming_tx.send(Ok(response_message)).await {
warn!("Failed to send response to channel - receiver may have been dropped");
}
}
Ok(())
}
async fn receive(&self) -> Option<Result<JSONRPCMessage, Error>> {
if !self.connected.load(Ordering::SeqCst) {
return Some(Err(Error::ConnectionClosed("Not connected".to_string())));
}
let mut receiver = self.receiver.write().await;
let receiver = receiver.as_mut()?;
receiver.recv().await
}
async fn is_connected(&self) -> bool {
self.connected.load(Ordering::SeqCst)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_http_transport_new() {
let transport = HttpTransport::new("http://localhost:8080").unwrap();
assert_eq!(transport.url, "http://localhost:8080");
assert!(!transport.is_connected().await);
let result = HttpTransport::new("invalid-url");
assert!(result.is_err());
let result = HttpTransport::new("ftp://localhost:8080");
assert!(result.is_err());
}
}