Skip to main content

sh_layer4/channel_gateway/adapter/
http.rs

1//! # HTTP Channel Adapter
2//!
3//! HTTP REST API 渠道适配器。
4
5use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::collections::VecDeque;
8use std::time::Duration;
9
10use crate::channel_gateway::{Channel, ChannelType, InboundMessage, OutboundMessage};
11use crate::types::Layer4Result;
12
13use reqwest::Client;
14use std::collections::HashMap;
15
16/// HTTP 渠道配置
17#[derive(Debug, Clone)]
18pub struct HttpChannelConfig {
19    pub base_url: String,
20    pub timeout_ms: u64,
21    pub headers: HashMap<String, String>,
22    pub retry_attempts: u32,
23    pub retry_interval_ms: u64,
24}
25
26impl Default for HttpChannelConfig {
27    fn default() -> Self {
28        Self {
29            base_url: "http://localhost:8080".to_string(),
30            timeout_ms: 30000,
31            headers: HashMap::new(),
32            retry_attempts: 3,
33            retry_interval_ms: 1000,
34        }
35    }
36}
37
38/// HTTP 渠道适配器
39pub struct HttpChannel {
40    channel_id: String,
41    config: HttpChannelConfig,
42    connected: RwLock<bool>,
43    request_queue: RwLock<VecDeque<InboundMessage>>,
44    /// HTTP 客户端
45    client: Client,
46}
47
48impl HttpChannel {
49    /// 创建新的 HTTP 渠道
50    pub fn new(channel_id: impl Into<String>, config: HttpChannelConfig) -> Self {
51        let client = Client::builder()
52            .timeout(Duration::from_millis(config.timeout_ms))
53            .build()
54            .expect("Failed to create HTTP client");
55
56        Self {
57            channel_id: channel_id.into(),
58            config,
59            connected: RwLock::new(false),
60            request_queue: RwLock::new(VecDeque::new()),
61            client,
62        }
63    }
64
65    /// 创建默认 HTTP 渠道
66    pub fn default_channel() -> Self {
67        Self::new("http-default", HttpChannelConfig::default())
68    }
69
70    /// 初始化连接(验证服务可用性)
71    pub async fn connect(&self) -> Layer4Result<()> {
72        let url = format!("{}/health", self.config.base_url);
73
74        match self.client.get(&url).send().await {
75            Ok(response) => {
76                if response.status().is_success() {
77                    *self.connected.write() = true;
78                    tracing::info!("HTTP channel connected to {}", self.config.base_url);
79                    Ok(())
80                } else {
81                    tracing::warn!("HTTP health check returned {}", response.status());
82                    // 即使健康检查失败,也认为连接可用(可能是没有健康检查端点)
83                    *self.connected.write() = true;
84                    Ok(())
85                }
86            }
87            Err(e) => {
88                // 网络错误时,仍然允许发送(可能在后台重连)
89                tracing::warn!("HTTP connection check failed: {}, proceeding anyway", e);
90                *self.connected.write() = true;
91                Ok(())
92            }
93        }
94    }
95
96    /// 发送 HTTP POST 请求
97    pub async fn post(
98        &self,
99        path: &str,
100        body: &serde_json::Value,
101    ) -> Layer4Result<serde_json::Value> {
102        let url = format!("{}{}", self.config.base_url, path);
103
104        let mut request = self.client.post(&url).json(body);
105
106        // 添加配置的 headers
107        for (key, value) in &self.config.headers {
108            request = request.header(key, value);
109        }
110
111        let response = self.send_with_retry(request).await?;
112        let result = response.json().await?;
113
114        Ok(result)
115    }
116
117    /// 发送 HTTP GET 请求
118    pub async fn get(&self, path: &str) -> Layer4Result<serde_json::Value> {
119        let url = format!("{}{}", self.config.base_url, path);
120
121        let mut request = self.client.get(&url);
122
123        // 添加配置的 headers
124        for (key, value) in &self.config.headers {
125            request = request.header(key, value);
126        }
127
128        let response = self.send_with_retry(request).await?;
129        let result = response.json().await?;
130
131        Ok(result)
132    }
133
134    /// 带重试的发送
135    async fn send_with_retry(
136        &self,
137        request: reqwest::RequestBuilder,
138    ) -> Layer4Result<reqwest::Response> {
139        let mut attempts = 0;
140        let max_attempts = self.config.retry_attempts;
141        let interval = Duration::from_millis(self.config.retry_interval_ms);
142
143        loop {
144            let request_clone = request
145                .try_clone()
146                .ok_or_else(|| anyhow::anyhow!("Failed to clone request for retry"))?;
147
148            match request_clone.send().await {
149                Ok(response) => {
150                    let status = response.status();
151                    if status.is_success() || status.is_client_error() {
152                        return Ok(response);
153                    }
154                    // 服务端错误,重试
155                    attempts += 1;
156                    if attempts >= max_attempts {
157                        return Err(anyhow::anyhow!(
158                            "HTTP request failed after {} attempts: {}",
159                            max_attempts,
160                            status
161                        ));
162                    }
163                    tracing::warn!(
164                        "HTTP request failed with {}, retrying ({}/{})",
165                        status,
166                        attempts,
167                        max_attempts
168                    );
169                    tokio::time::sleep(interval).await;
170                }
171                Err(e) => {
172                    attempts += 1;
173                    if attempts >= max_attempts {
174                        return Err(anyhow::anyhow!("HTTP request failed: {}", e));
175                    }
176                    tracing::warn!(
177                        "HTTP request error: {}, retrying ({}/{})",
178                        e,
179                        attempts,
180                        max_attempts
181                    );
182                    tokio::time::sleep(interval).await;
183                }
184            }
185        }
186    }
187
188    /// 处理 HTTP 请求(模拟接收)
189    pub fn handle_request(&self, user_id: &str, content: &str) {
190        let message = InboundMessage::new(&self.channel_id, user_id, content).with_metadata(
191            serde_json::json!({
192                "source": "http",
193                "method": "POST"
194            }),
195        );
196        self.request_queue.write().push_back(message);
197    }
198
199    /// 处理带会话的请求
200    pub fn handle_request_with_session(&self, user_id: &str, content: &str, session_id: &str) {
201        let message = InboundMessage::new(&self.channel_id, user_id, content)
202            .with_session(session_id)
203            .with_metadata(serde_json::json!({
204                "source": "http",
205                "method": "POST"
206            }));
207        self.request_queue.write().push_back(message);
208    }
209
210    /// 获取基础 URL
211    pub fn base_url(&self) -> &str {
212        &self.config.base_url
213    }
214}
215
216#[async_trait]
217impl Channel for HttpChannel {
218    fn id(&self) -> &str {
219        &self.channel_id
220    }
221
222    fn channel_type(&self) -> ChannelType {
223        ChannelType::Http
224    }
225
226    async fn send(&self, message: &OutboundMessage) -> Layer4Result<()> {
227        if !*self.connected.read() {
228            return Err(anyhow::anyhow!("Channel not connected"));
229        }
230
231        // 构建 HTTP POST 请求体
232        let body = serde_json::json!({
233            "message_id": message.message_id,
234            "content": message.content,
235            "message_type": message.message_type,
236            "target": message.target,
237            "metadata": message.metadata,
238            "timestamp": message.timestamp.to_rfc3339(),
239        });
240
241        // 发送 POST 请求到 /api/message 端点
242        let path = "/api/message";
243        let result = self.post(path, &body).await?;
244
245        tracing::debug!(
246            "HTTP channel sent message {}, response: {:?}",
247            message.message_id,
248            result
249        );
250        Ok(())
251    }
252
253    async fn try_receive(&self) -> Layer4Result<Option<InboundMessage>> {
254        if !*self.connected.read() {
255            return Err(anyhow::anyhow!("Channel not connected"));
256        }
257
258        Ok(self.request_queue.write().pop_front())
259    }
260
261    fn is_connected(&self) -> bool {
262        *self.connected.read()
263    }
264
265    async fn close(&self) -> Layer4Result<()> {
266        *self.connected.write() = false;
267        self.request_queue.write().clear();
268        tracing::info!("HTTP channel closed");
269        Ok(())
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_http_channel_creation() {
279        let channel = HttpChannel::default_channel();
280        assert_eq!(channel.id(), "http-default");
281        // 初始状态未连接
282        assert!(!channel.is_connected());
283    }
284
285    #[test]
286    fn test_http_config_default() {
287        let config = HttpChannelConfig::default();
288        assert_eq!(config.timeout_ms, 30000);
289        assert_eq!(config.retry_attempts, 3);
290        assert_eq!(config.retry_interval_ms, 1000);
291    }
292
293    #[test]
294    fn test_http_channel_handle_request() {
295        let channel = HttpChannel::default_channel();
296        // 手动设置连接状态
297        *channel.connected.write() = true;
298        channel.handle_request("user-1", "POST /api/agent");
299
300        let count = channel.request_queue.read().len();
301        assert_eq!(count, 1);
302    }
303
304    #[test]
305    fn test_http_channel_base_url() {
306        let channel = HttpChannel::default_channel();
307        assert_eq!(channel.base_url(), "http://localhost:8080");
308    }
309
310    #[tokio::test]
311    async fn test_http_channel_close() {
312        let channel = HttpChannel::default_channel();
313        *channel.connected.write() = true;
314        channel.close().await.unwrap();
315
316        assert!(!channel.is_connected());
317    }
318
319    #[tokio::test]
320    async fn test_send_without_connection() {
321        let channel = HttpChannel::default_channel();
322        // 未连接时发送应该失败
323        let msg = OutboundMessage::to_user("test-user", "hello");
324        let result = channel.send(&msg).await;
325        assert!(result.is_err());
326    }
327}