use async_trait::async_trait;
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::time::Duration;
use crate::channel_gateway::{Channel, ChannelType, InboundMessage, OutboundMessage};
use crate::types::Layer4Result;
use reqwest::Client;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct HttpChannelConfig {
pub base_url: String,
pub timeout_ms: u64,
pub headers: HashMap<String, String>,
pub retry_attempts: u32,
pub retry_interval_ms: u64,
}
impl Default for HttpChannelConfig {
fn default() -> Self {
Self {
base_url: "http://localhost:8080".to_string(),
timeout_ms: 30000,
headers: HashMap::new(),
retry_attempts: 3,
retry_interval_ms: 1000,
}
}
}
pub struct HttpChannel {
channel_id: String,
config: HttpChannelConfig,
connected: RwLock<bool>,
request_queue: RwLock<VecDeque<InboundMessage>>,
client: Client,
}
impl HttpChannel {
pub fn new(channel_id: impl Into<String>, config: HttpChannelConfig) -> Self {
let client = Client::builder()
.timeout(Duration::from_millis(config.timeout_ms))
.build()
.expect("Failed to create HTTP client");
Self {
channel_id: channel_id.into(),
config,
connected: RwLock::new(false),
request_queue: RwLock::new(VecDeque::new()),
client,
}
}
pub fn default_channel() -> Self {
Self::new("http-default", HttpChannelConfig::default())
}
pub async fn connect(&self) -> Layer4Result<()> {
let url = format!("{}/health", self.config.base_url);
match self.client.get(&url).send().await {
Ok(response) => {
if response.status().is_success() {
*self.connected.write() = true;
tracing::info!("HTTP channel connected to {}", self.config.base_url);
Ok(())
} else {
tracing::warn!("HTTP health check returned {}", response.status());
*self.connected.write() = true;
Ok(())
}
}
Err(e) => {
tracing::warn!("HTTP connection check failed: {}, proceeding anyway", e);
*self.connected.write() = true;
Ok(())
}
}
}
pub async fn post(
&self,
path: &str,
body: &serde_json::Value,
) -> Layer4Result<serde_json::Value> {
let url = format!("{}{}", self.config.base_url, path);
let mut request = self.client.post(&url).json(body);
for (key, value) in &self.config.headers {
request = request.header(key, value);
}
let response = self.send_with_retry(request).await?;
let result = response.json().await?;
Ok(result)
}
pub async fn get(&self, path: &str) -> Layer4Result<serde_json::Value> {
let url = format!("{}{}", self.config.base_url, path);
let mut request = self.client.get(&url);
for (key, value) in &self.config.headers {
request = request.header(key, value);
}
let response = self.send_with_retry(request).await?;
let result = response.json().await?;
Ok(result)
}
async fn send_with_retry(
&self,
request: reqwest::RequestBuilder,
) -> Layer4Result<reqwest::Response> {
let mut attempts = 0;
let max_attempts = self.config.retry_attempts;
let interval = Duration::from_millis(self.config.retry_interval_ms);
loop {
let request_clone = request
.try_clone()
.ok_or_else(|| anyhow::anyhow!("Failed to clone request for retry"))?;
match request_clone.send().await {
Ok(response) => {
let status = response.status();
if status.is_success() || status.is_client_error() {
return Ok(response);
}
attempts += 1;
if attempts >= max_attempts {
return Err(anyhow::anyhow!(
"HTTP request failed after {} attempts: {}",
max_attempts,
status
));
}
tracing::warn!(
"HTTP request failed with {}, retrying ({}/{})",
status,
attempts,
max_attempts
);
tokio::time::sleep(interval).await;
}
Err(e) => {
attempts += 1;
if attempts >= max_attempts {
return Err(anyhow::anyhow!("HTTP request failed: {}", e));
}
tracing::warn!(
"HTTP request error: {}, retrying ({}/{})",
e,
attempts,
max_attempts
);
tokio::time::sleep(interval).await;
}
}
}
}
pub fn handle_request(&self, user_id: &str, content: &str) {
let message = InboundMessage::new(&self.channel_id, user_id, content).with_metadata(
serde_json::json!({
"source": "http",
"method": "POST"
}),
);
self.request_queue.write().push_back(message);
}
pub fn handle_request_with_session(&self, user_id: &str, content: &str, session_id: &str) {
let message = InboundMessage::new(&self.channel_id, user_id, content)
.with_session(session_id)
.with_metadata(serde_json::json!({
"source": "http",
"method": "POST"
}));
self.request_queue.write().push_back(message);
}
pub fn base_url(&self) -> &str {
&self.config.base_url
}
}
#[async_trait]
impl Channel for HttpChannel {
fn id(&self) -> &str {
&self.channel_id
}
fn channel_type(&self) -> ChannelType {
ChannelType::Http
}
async fn send(&self, message: &OutboundMessage) -> Layer4Result<()> {
if !*self.connected.read() {
return Err(anyhow::anyhow!("Channel not connected"));
}
let body = serde_json::json!({
"message_id": message.message_id,
"content": message.content,
"message_type": message.message_type,
"target": message.target,
"metadata": message.metadata,
"timestamp": message.timestamp.to_rfc3339(),
});
let path = "/api/message";
let result = self.post(path, &body).await?;
tracing::debug!(
"HTTP channel sent message {}, response: {:?}",
message.message_id,
result
);
Ok(())
}
async fn try_receive(&self) -> Layer4Result<Option<InboundMessage>> {
if !*self.connected.read() {
return Err(anyhow::anyhow!("Channel not connected"));
}
Ok(self.request_queue.write().pop_front())
}
fn is_connected(&self) -> bool {
*self.connected.read()
}
async fn close(&self) -> Layer4Result<()> {
*self.connected.write() = false;
self.request_queue.write().clear();
tracing::info!("HTTP channel closed");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_http_channel_creation() {
let channel = HttpChannel::default_channel();
assert_eq!(channel.id(), "http-default");
assert!(!channel.is_connected());
}
#[test]
fn test_http_config_default() {
let config = HttpChannelConfig::default();
assert_eq!(config.timeout_ms, 30000);
assert_eq!(config.retry_attempts, 3);
assert_eq!(config.retry_interval_ms, 1000);
}
#[test]
fn test_http_channel_handle_request() {
let channel = HttpChannel::default_channel();
*channel.connected.write() = true;
channel.handle_request("user-1", "POST /api/agent");
let count = channel.request_queue.read().len();
assert_eq!(count, 1);
}
#[test]
fn test_http_channel_base_url() {
let channel = HttpChannel::default_channel();
assert_eq!(channel.base_url(), "http://localhost:8080");
}
#[tokio::test]
async fn test_http_channel_close() {
let channel = HttpChannel::default_channel();
*channel.connected.write() = true;
channel.close().await.unwrap();
assert!(!channel.is_connected());
}
#[tokio::test]
async fn test_send_without_connection() {
let channel = HttpChannel::default_channel();
let msg = OutboundMessage::to_user("test-user", "hello");
let result = channel.send(&msg).await;
assert!(result.is_err());
}
}