1use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
6use arkflow_core::{Error, MessageBatch};
7use async_trait::async_trait;
8use reqwest::{header, Client};
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct HttpOutputConfig {
17 pub url: String,
19 pub method: String,
21 pub timeout_ms: u64,
23 pub retry_count: u32,
25 pub headers: Option<std::collections::HashMap<String, String>>,
27}
28
29pub struct HttpOutput {
31 config: HttpOutputConfig,
32 client: Arc<Mutex<Option<Client>>>,
33 connected: AtomicBool,
34}
35
36impl HttpOutput {
37 pub fn new(config: HttpOutputConfig) -> Result<Self, Error> {
39 Ok(Self {
40 config,
41 client: Arc::new(Mutex::new(None)),
42 connected: AtomicBool::new(false),
43 })
44 }
45}
46
47#[async_trait]
48impl Output for HttpOutput {
49 async fn connect(&self) -> Result<(), Error> {
50 let client_builder =
52 Client::builder().timeout(std::time::Duration::from_millis(self.config.timeout_ms));
53 let client_arc = self.client.clone();
54 client_arc.lock().await.replace(
55 client_builder.build().map_err(|e| {
56 Error::Connection(format!("Unable to create an HTTP client: {}", e))
57 })?,
58 );
59
60 self.connected.store(true, Ordering::SeqCst);
61 Ok(())
62 }
63
64 async fn write(&self, msg: &MessageBatch) -> Result<(), Error> {
65 let client_arc = self.client.clone();
66 let client_arc_guard = client_arc.lock().await;
67 if !self.connected.load(Ordering::SeqCst) || client_arc_guard.is_none() {
68 return Err(Error::Connection("The output is not connected".to_string()));
69 }
70
71 let client = client_arc_guard.as_ref().unwrap();
72 let content = msg.as_string()?;
73 if content.is_empty() {
74 return Ok(());
75 }
76 let body;
77 if content.len() == 1 {
78 body = content[0].clone();
79 } else {
80 body = serde_json::to_string(&content)
81 .map_err(|_| Error::Process("Unable to serialize message".to_string()))?;
82 }
83
84 let mut request_builder = match self.config.method.to_uppercase().as_str() {
86 "GET" => client.get(&self.config.url),
87 "POST" => client.post(&self.config.url).body(body), "PUT" => client.put(&self.config.url).body(body),
89 "DELETE" => client.delete(&self.config.url),
90 "PATCH" => client.patch(&self.config.url).body(body),
91 _ => {
92 return Err(Error::Config(format!(
93 "HTTP methods that are not supported: {}",
94 self.config.method
95 )))
96 }
97 };
98
99 if let Some(headers) = &self.config.headers {
101 for (key, value) in headers {
102 request_builder = request_builder.header(key, value);
103 }
104 }
105
106 if let Some(headers) = &self.config.headers {
109 if !headers.contains_key("Content-Type") {
110 request_builder = request_builder.header(header::CONTENT_TYPE, "application/json");
111 }
112 } else {
113 request_builder = request_builder.header(header::CONTENT_TYPE, "application/json");
114 }
115
116 let mut retry_count = 0;
118 let mut last_error = None;
119
120 while retry_count <= self.config.retry_count {
121 match request_builder.try_clone().unwrap().send().await {
122 Ok(response) => {
123 if response.status().is_success() {
124 return Ok(());
125 } else {
126 let status = response.status();
127 let body = response
128 .text()
129 .await
130 .unwrap_or_else(|_| "<Unable to read response body>".to_string());
131 last_error = Some(Error::Process(format!(
132 "HTTP Request Failed: Status code {}, response: {}",
133 status, body
134 )));
135 }
136 }
137 Err(e) => {
138 last_error = Some(Error::Connection(format!("HTTP request error: {}", e)));
139 }
140 }
141
142 retry_count += 1;
143 if retry_count <= self.config.retry_count {
144 tokio::time::sleep(std::time::Duration::from_millis(
146 100 * 2u64.pow(retry_count - 1),
147 ))
148 .await;
149 }
150 }
151
152 Err(last_error.unwrap_or_else(|| Error::Unknown("Unknown HTTP error".to_string())))
153 }
154
155 async fn close(&self) -> Result<(), Error> {
156 self.connected.store(false, Ordering::SeqCst);
157 let mut guard = self.client.lock().await;
158 *guard = None;
159 Ok(())
160 }
161}
162
163pub(crate) struct HttpOutputBuilder;
164impl OutputBuilder for HttpOutputBuilder {
165 fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error> {
166 if config.is_none() {
167 return Err(Error::Config(
168 "HTTP output configuration is missing".to_string(),
169 ));
170 }
171 let config: HttpOutputConfig = serde_json::from_value(config.clone().unwrap())?;
172
173 Ok(Arc::new(HttpOutput::new(config)?))
174 }
175}
176
177pub fn init() {
178 register_output_builder("http", Arc::new(HttpOutputBuilder));
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use axum::{
185 extract::Json,
186 http::StatusCode,
187 response::IntoResponse,
188 routing::{delete, get, patch, post, put},
189 Router,
190 };
191 use serde_json::json;
192 use std::net::SocketAddr;
193
194 #[tokio::test]
196 async fn test_http_output_new() {
197 let config = HttpOutputConfig {
199 url: "http://example.com".to_string(),
200 method: "POST".to_string(),
201 timeout_ms: 5000,
202 retry_count: 3,
203 headers: None,
204 };
205
206 let output = HttpOutput::new(config);
208 assert!(output.is_ok(), "Failed to create HTTP output component");
209 }
210
211 #[tokio::test]
213 async fn test_http_output_connect() {
214 let config = HttpOutputConfig {
216 url: "http://example.com".to_string(),
217 method: "POST".to_string(),
218 timeout_ms: 5000,
219 retry_count: 3,
220 headers: None,
221 };
222
223 let output = HttpOutput::new(config).unwrap();
225 let result = output.connect().await;
226 assert!(result.is_ok(), "Failed to connect HTTP output");
227 assert!(
228 output.connected.load(Ordering::SeqCst),
229 "Connected flag not set"
230 );
231
232 let client_guard = output.client.lock().await;
234 assert!(client_guard.is_some(), "HTTP client not initialized");
235 }
236
237 #[tokio::test]
239 async fn test_http_output_close() {
240 let config = HttpOutputConfig {
242 url: "http://example.com".to_string(),
243 method: "POST".to_string(),
244 timeout_ms: 5000,
245 retry_count: 3,
246 headers: None,
247 };
248
249 let output = HttpOutput::new(config).unwrap();
251 output.connect().await.unwrap();
252 let result = output.close().await;
253
254 assert!(result.is_ok(), "Failed to close HTTP output");
255 assert!(
256 !output.connected.load(Ordering::SeqCst),
257 "Connected flag not reset"
258 );
259
260 let client_guard = output.client.lock().await;
262 assert!(client_guard.is_none(), "HTTP client not cleared");
263 }
264
265 #[tokio::test]
267 async fn test_http_output_write_without_connect() {
268 let config = HttpOutputConfig {
270 url: "http://example.com".to_string(),
271 method: "POST".to_string(),
272 timeout_ms: 5000,
273 retry_count: 3,
274 headers: None,
275 };
276
277 let output = HttpOutput::new(config).unwrap();
279 let msg = MessageBatch::from_string("test message");
280 let result = output.write(&msg).await;
281
282 assert!(result.is_err(), "Write should fail when not connected");
284 match result {
285 Err(Error::Connection(_)) => {} _ => panic!("Expected Connection error"),
287 }
288 }
289
290 #[tokio::test]
292 async fn test_http_output_write_empty_message() {
293 let config = HttpOutputConfig {
295 url: "http://example.com".to_string(),
296 method: "POST".to_string(),
297 timeout_ms: 5000,
298 retry_count: 3,
299 headers: None,
300 };
301
302 let output = HttpOutput::new(config).unwrap();
304 output.connect().await.unwrap();
305
306 let msg = MessageBatch::new_binary(vec![]);
308 let result = output.write(&msg).await;
309
310 assert!(result.is_ok(), "Write should succeed with empty message");
312 }
313
314 #[tokio::test]
316 async fn test_http_output_with_custom_headers() {
317 let mut headers = std::collections::HashMap::new();
319 headers.insert("X-Custom-Header".to_string(), "test-value".to_string());
320 headers.insert("Content-Type".to_string(), "application/xml".to_string());
321
322 let config = HttpOutputConfig {
323 url: "http://example.com".to_string(),
324 method: "POST".to_string(),
325 timeout_ms: 5000,
326 retry_count: 3,
327 headers: Some(headers),
328 };
329
330 let output = HttpOutput::new(config).unwrap();
332 output.connect().await.unwrap();
333
334 assert!(output.connected.load(Ordering::SeqCst));
337 }
338
339 #[tokio::test]
341 async fn test_http_output_unsupported_method() {
342 let config = HttpOutputConfig {
344 url: "http://example.com".to_string(),
345 method: "CONNECT".to_string(), timeout_ms: 5000,
347 retry_count: 3,
348 headers: None,
349 };
350
351 let output = HttpOutput::new(config).unwrap();
353 output.connect().await.unwrap();
354
355 let msg = MessageBatch::from_string("test message");
357 let result = output.write(&msg).await;
358
359 assert!(result.is_err(), "Write should fail with unsupported method");
361 match result {
362 Err(Error::Config(_)) => {} _ => panic!("Expected Config error, got {:?}", result),
364 }
365 }
366
367 async fn setup_test_server() -> (String, tokio::task::JoinHandle<()>) {
369 let app = Router::new()
371 .route("/get", get(|| async { StatusCode::OK }))
372 .route(
373 "/post",
374 post(|_: Json<serde_json::Value>| async { StatusCode::CREATED }),
375 )
376 .route(
377 "/put",
378 put(|_: Json<serde_json::Value>| async { StatusCode::OK }),
379 )
380 .route("/delete", delete(|| async { StatusCode::NO_CONTENT }))
381 .route(
382 "/patch",
383 patch(|_: Json<serde_json::Value>| async { StatusCode::OK }),
384 )
385 .route(
386 "/error",
387 get(|| async { StatusCode::INTERNAL_SERVER_ERROR.into_response() }),
388 );
389
390 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
392 let addr = listener.local_addr().unwrap();
393 let port = addr.port();
394 drop(listener); let server_handle = tokio::spawn(async move {
398 let addr = SocketAddr::from(([127, 0, 0, 1], port));
399 axum::Server::bind(&addr)
400 .serve(app.into_make_service())
401 .await
402 .unwrap();
403 });
404
405 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
407
408 (format!("http://127.0.0.1:{}", port), server_handle)
410 }
411
412 #[tokio::test]
414 async fn test_http_output_get_request() {
415 let (base_url, server_handle) = setup_test_server().await;
417 let url = format!("{}/get", base_url);
418
419 let config = HttpOutputConfig {
421 url,
422 method: "GET".to_string(),
423 timeout_ms: 5000,
424 retry_count: 3,
425 headers: None,
426 };
427
428 let output = HttpOutput::new(config).unwrap();
430 output.connect().await.unwrap();
431 let msg = MessageBatch::from_string("test message");
432 let result = output.write(&msg).await;
433
434 assert!(result.is_ok(), "Write should succeed with 200 OK response");
436
437 output.close().await.unwrap();
439 server_handle.abort();
440 }
441
442 #[tokio::test]
444 async fn test_http_output_post_request() {
445 let (base_url, server_handle) = setup_test_server().await;
447 let url = format!("{}/post", base_url);
448
449 let config = HttpOutputConfig {
451 url,
452 method: "POST".to_string(),
453 timeout_ms: 5000,
454 retry_count: 3,
455 headers: None,
456 };
457
458 let output = HttpOutput::new(config).unwrap();
460 output.connect().await.unwrap();
461
462 let msg = MessageBatch::from_string("{\"test\": \"message\"}");
464
465 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
467
468 let result = output.write(&msg).await;
469
470 assert!(
472 result.is_ok(),
473 "Write should succeed with 201 Created response"
474 );
475
476 output.close().await.unwrap();
478 server_handle.abort();
479 }
480
481 #[tokio::test]
483 async fn test_http_output_error_response() {
484 let (base_url, server_handle) = setup_test_server().await;
486 let url = format!("{}/error", base_url);
487
488 let config = HttpOutputConfig {
490 url,
491 method: "GET".to_string(),
492 timeout_ms: 5000,
493 retry_count: 0, headers: None,
495 };
496
497 let output = HttpOutput::new(config).unwrap();
499 output.connect().await.unwrap();
500 let msg = MessageBatch::from_string("test message");
501 let result = output.write(&msg).await;
502
503 assert!(result.is_err(), "Write should fail with error response");
505 match result {
506 Err(Error::Process(_)) => {} Err(Error::Connection(_)) => {} _ => panic!("Expected Processing or Connection error, got {:?}", result),
509 }
510
511 output.close().await.unwrap();
513 server_handle.abort();
514 }
515
516 #[tokio::test]
518 async fn test_http_output_retry() {
519 let (base_url, server_handle) = setup_test_server().await;
521
522 let url = format!("{}/nonexistent", base_url);
524
525 let config = HttpOutputConfig {
527 url,
528 method: "GET".to_string(),
529 timeout_ms: 1000,
530 retry_count: 2, headers: None,
532 };
533
534 let output = HttpOutput::new(config).unwrap();
536 output.connect().await.unwrap();
537 let msg = MessageBatch::from_string("test message");
538
539 let start = std::time::Instant::now();
541 let result = output.write(&msg).await;
542 let elapsed = start.elapsed();
543
544 assert!(result.is_err(), "Write should fail after retries");
546
547 assert!(
550 elapsed.as_millis() >= 300,
551 "Retry mechanism not working properly"
552 );
553
554 output.close().await.unwrap();
556 server_handle.abort();
557 }
558
559 #[tokio::test]
561 async fn test_http_output_builder() {
562 let config = json!({
564 "url": "http://example.com",
565 "method": "POST",
566 "timeout_ms": 5000,
567 "retry_count": 3
568 });
569
570 let builder = HttpOutputBuilder;
572 let result = builder.build(&Some(config));
573 assert!(
574 result.is_ok(),
575 "Builder should create output with valid config"
576 );
577
578 let result = builder.build(&None);
580 assert!(result.is_err(), "Builder should fail with missing config");
581
582 let invalid_config = json!({"invalid": "config"});
584 let result = builder.build(&Some(invalid_config));
585 assert!(result.is_err(), "Builder should fail with invalid config");
586 }
587}