sh_layer4/channel_gateway/adapter/
http.rs1use async_trait::async_trait;
6use parking_lot::RwLock;
7use std::collections::VecDeque;
8use std::time::Duration;
9
10use crate::channel_gateway::{Channel, ChannelType, InboundMessage, OutboundMessage};
11use crate::types::Layer4Result;
12
13use reqwest::Client;
14use std::collections::HashMap;
15
16#[derive(Debug, Clone)]
18pub struct HttpChannelConfig {
19 pub base_url: String,
20 pub timeout_ms: u64,
21 pub headers: HashMap<String, String>,
22 pub retry_attempts: u32,
23 pub retry_interval_ms: u64,
24}
25
26impl Default for HttpChannelConfig {
27 fn default() -> Self {
28 Self {
29 base_url: "http://localhost:8080".to_string(),
30 timeout_ms: 30000,
31 headers: HashMap::new(),
32 retry_attempts: 3,
33 retry_interval_ms: 1000,
34 }
35 }
36}
37
38pub struct HttpChannel {
40 channel_id: String,
41 config: HttpChannelConfig,
42 connected: RwLock<bool>,
43 request_queue: RwLock<VecDeque<InboundMessage>>,
44 client: Client,
46}
47
48impl HttpChannel {
49 pub fn new(channel_id: impl Into<String>, config: HttpChannelConfig) -> Self {
51 let client = Client::builder()
52 .timeout(Duration::from_millis(config.timeout_ms))
53 .build()
54 .expect("Failed to create HTTP client");
55
56 Self {
57 channel_id: channel_id.into(),
58 config,
59 connected: RwLock::new(false),
60 request_queue: RwLock::new(VecDeque::new()),
61 client,
62 }
63 }
64
65 pub fn default_channel() -> Self {
67 Self::new("http-default", HttpChannelConfig::default())
68 }
69
70 pub async fn connect(&self) -> Layer4Result<()> {
72 let url = format!("{}/health", self.config.base_url);
73
74 match self.client.get(&url).send().await {
75 Ok(response) => {
76 if response.status().is_success() {
77 *self.connected.write() = true;
78 tracing::info!("HTTP channel connected to {}", self.config.base_url);
79 Ok(())
80 } else {
81 tracing::warn!("HTTP health check returned {}", response.status());
82 *self.connected.write() = true;
84 Ok(())
85 }
86 }
87 Err(e) => {
88 tracing::warn!("HTTP connection check failed: {}, proceeding anyway", e);
90 *self.connected.write() = true;
91 Ok(())
92 }
93 }
94 }
95
96 pub async fn post(
98 &self,
99 path: &str,
100 body: &serde_json::Value,
101 ) -> Layer4Result<serde_json::Value> {
102 let url = format!("{}{}", self.config.base_url, path);
103
104 let mut request = self.client.post(&url).json(body);
105
106 for (key, value) in &self.config.headers {
108 request = request.header(key, value);
109 }
110
111 let response = self.send_with_retry(request).await?;
112 let result = response.json().await?;
113
114 Ok(result)
115 }
116
117 pub async fn get(&self, path: &str) -> Layer4Result<serde_json::Value> {
119 let url = format!("{}{}", self.config.base_url, path);
120
121 let mut request = self.client.get(&url);
122
123 for (key, value) in &self.config.headers {
125 request = request.header(key, value);
126 }
127
128 let response = self.send_with_retry(request).await?;
129 let result = response.json().await?;
130
131 Ok(result)
132 }
133
134 async fn send_with_retry(
136 &self,
137 request: reqwest::RequestBuilder,
138 ) -> Layer4Result<reqwest::Response> {
139 let mut attempts = 0;
140 let max_attempts = self.config.retry_attempts;
141 let interval = Duration::from_millis(self.config.retry_interval_ms);
142
143 loop {
144 let request_clone = request
145 .try_clone()
146 .ok_or_else(|| anyhow::anyhow!("Failed to clone request for retry"))?;
147
148 match request_clone.send().await {
149 Ok(response) => {
150 let status = response.status();
151 if status.is_success() || status.is_client_error() {
152 return Ok(response);
153 }
154 attempts += 1;
156 if attempts >= max_attempts {
157 return Err(anyhow::anyhow!(
158 "HTTP request failed after {} attempts: {}",
159 max_attempts,
160 status
161 ));
162 }
163 tracing::warn!(
164 "HTTP request failed with {}, retrying ({}/{})",
165 status,
166 attempts,
167 max_attempts
168 );
169 tokio::time::sleep(interval).await;
170 }
171 Err(e) => {
172 attempts += 1;
173 if attempts >= max_attempts {
174 return Err(anyhow::anyhow!("HTTP request failed: {}", e));
175 }
176 tracing::warn!(
177 "HTTP request error: {}, retrying ({}/{})",
178 e,
179 attempts,
180 max_attempts
181 );
182 tokio::time::sleep(interval).await;
183 }
184 }
185 }
186 }
187
188 pub fn handle_request(&self, user_id: &str, content: &str) {
190 let message = InboundMessage::new(&self.channel_id, user_id, content).with_metadata(
191 serde_json::json!({
192 "source": "http",
193 "method": "POST"
194 }),
195 );
196 self.request_queue.write().push_back(message);
197 }
198
199 pub fn handle_request_with_session(&self, user_id: &str, content: &str, session_id: &str) {
201 let message = InboundMessage::new(&self.channel_id, user_id, content)
202 .with_session(session_id)
203 .with_metadata(serde_json::json!({
204 "source": "http",
205 "method": "POST"
206 }));
207 self.request_queue.write().push_back(message);
208 }
209
210 pub fn base_url(&self) -> &str {
212 &self.config.base_url
213 }
214}
215
216#[async_trait]
217impl Channel for HttpChannel {
218 fn id(&self) -> &str {
219 &self.channel_id
220 }
221
222 fn channel_type(&self) -> ChannelType {
223 ChannelType::Http
224 }
225
226 async fn send(&self, message: &OutboundMessage) -> Layer4Result<()> {
227 if !*self.connected.read() {
228 return Err(anyhow::anyhow!("Channel not connected"));
229 }
230
231 let body = serde_json::json!({
233 "message_id": message.message_id,
234 "content": message.content,
235 "message_type": message.message_type,
236 "target": message.target,
237 "metadata": message.metadata,
238 "timestamp": message.timestamp.to_rfc3339(),
239 });
240
241 let path = "/api/message";
243 let result = self.post(path, &body).await?;
244
245 tracing::debug!(
246 "HTTP channel sent message {}, response: {:?}",
247 message.message_id,
248 result
249 );
250 Ok(())
251 }
252
253 async fn try_receive(&self) -> Layer4Result<Option<InboundMessage>> {
254 if !*self.connected.read() {
255 return Err(anyhow::anyhow!("Channel not connected"));
256 }
257
258 Ok(self.request_queue.write().pop_front())
259 }
260
261 fn is_connected(&self) -> bool {
262 *self.connected.read()
263 }
264
265 async fn close(&self) -> Layer4Result<()> {
266 *self.connected.write() = false;
267 self.request_queue.write().clear();
268 tracing::info!("HTTP channel closed");
269 Ok(())
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn test_http_channel_creation() {
279 let channel = HttpChannel::default_channel();
280 assert_eq!(channel.id(), "http-default");
281 assert!(!channel.is_connected());
283 }
284
285 #[test]
286 fn test_http_config_default() {
287 let config = HttpChannelConfig::default();
288 assert_eq!(config.timeout_ms, 30000);
289 assert_eq!(config.retry_attempts, 3);
290 assert_eq!(config.retry_interval_ms, 1000);
291 }
292
293 #[test]
294 fn test_http_channel_handle_request() {
295 let channel = HttpChannel::default_channel();
296 *channel.connected.write() = true;
298 channel.handle_request("user-1", "POST /api/agent");
299
300 let count = channel.request_queue.read().len();
301 assert_eq!(count, 1);
302 }
303
304 #[test]
305 fn test_http_channel_base_url() {
306 let channel = HttpChannel::default_channel();
307 assert_eq!(channel.base_url(), "http://localhost:8080");
308 }
309
310 #[tokio::test]
311 async fn test_http_channel_close() {
312 let channel = HttpChannel::default_channel();
313 *channel.connected.write() = true;
314 channel.close().await.unwrap();
315
316 assert!(!channel.is_connected());
317 }
318
319 #[tokio::test]
320 async fn test_send_without_connection() {
321 let channel = HttpChannel::default_channel();
322 let msg = OutboundMessage::to_user("test-user", "hello");
324 let result = channel.send(&msg).await;
325 assert!(result.is_err());
326 }
327}