arkflow_plugin/output/
http.rs

1//! HTTP output component
2//!
3//! Send the processed data to the HTTP endpoint
4
5use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
6use arkflow_core::{Error, MessageBatch};
7use async_trait::async_trait;
8use reqwest::{header, Client};
9use serde::{Deserialize, Serialize};
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14/// HTTP output configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct HttpOutputConfig {
17    /// Destination URL
18    pub url: String,
19    /// HTTP method
20    pub method: String,
21    /// Timeout Period (ms)
22    pub timeout_ms: u64,
23    /// Number of retries
24    pub retry_count: u32,
25    /// Request header
26    pub headers: Option<std::collections::HashMap<String, String>>,
27}
28
29/// HTTP output component
30pub struct HttpOutput {
31    config: HttpOutputConfig,
32    client: Arc<Mutex<Option<Client>>>,
33    connected: AtomicBool,
34}
35
36impl HttpOutput {
37    /// Create a new HTTP output component
38    pub fn new(config: HttpOutputConfig) -> Result<Self, Error> {
39        Ok(Self {
40            config,
41            client: Arc::new(Mutex::new(None)),
42            connected: AtomicBool::new(false),
43        })
44    }
45}
46
47#[async_trait]
48impl Output for HttpOutput {
49    async fn connect(&self) -> Result<(), Error> {
50        // Create an HTTP client
51        let client_builder =
52            Client::builder().timeout(std::time::Duration::from_millis(self.config.timeout_ms));
53        let client_arc = self.client.clone();
54        client_arc.lock().await.replace(
55            client_builder.build().map_err(|e| {
56                Error::Connection(format!("Unable to create an HTTP client: {}", e))
57            })?,
58        );
59
60        self.connected.store(true, Ordering::SeqCst);
61        Ok(())
62    }
63
64    async fn write(&self, msg: &MessageBatch) -> Result<(), Error> {
65        let client_arc = self.client.clone();
66        let client_arc_guard = client_arc.lock().await;
67        if !self.connected.load(Ordering::SeqCst) || client_arc_guard.is_none() {
68            return Err(Error::Connection("The output is not connected".to_string()));
69        }
70
71        let client = client_arc_guard.as_ref().unwrap();
72        let content = msg.as_string()?;
73        if content.is_empty() {
74            return Ok(());
75        }
76        let body;
77        if content.len() == 1 {
78            body = content[0].clone();
79        } else {
80            body = serde_json::to_string(&content)
81                .map_err(|_| Error::Process("Unable to serialize message".to_string()))?;
82        }
83
84        // Build the request
85        let mut request_builder = match self.config.method.to_uppercase().as_str() {
86            "GET" => client.get(&self.config.url),
87            "POST" => client.post(&self.config.url).body(body), // Content-Type由统一逻辑添加
88            "PUT" => client.put(&self.config.url).body(body),
89            "DELETE" => client.delete(&self.config.url),
90            "PATCH" => client.patch(&self.config.url).body(body),
91            _ => {
92                return Err(Error::Config(format!(
93                    "HTTP methods that are not supported: {}",
94                    self.config.method
95                )))
96            }
97        };
98
99        // Add request headers
100        if let Some(headers) = &self.config.headers {
101            for (key, value) in headers {
102                request_builder = request_builder.header(key, value);
103            }
104        }
105
106        // Add content type header (if not specified)
107        // 始终添加Content-Type头(如果未指定)
108        if let Some(headers) = &self.config.headers {
109            if !headers.contains_key("Content-Type") {
110                request_builder = request_builder.header(header::CONTENT_TYPE, "application/json");
111            }
112        } else {
113            request_builder = request_builder.header(header::CONTENT_TYPE, "application/json");
114        }
115
116        // Send a request
117        let mut retry_count = 0;
118        let mut last_error = None;
119
120        while retry_count <= self.config.retry_count {
121            match request_builder.try_clone().unwrap().send().await {
122                Ok(response) => {
123                    if response.status().is_success() {
124                        return Ok(());
125                    } else {
126                        let status = response.status();
127                        let body = response
128                            .text()
129                            .await
130                            .unwrap_or_else(|_| "<Unable to read response body>".to_string());
131                        last_error = Some(Error::Process(format!(
132                            "HTTP Request Failed: Status code {}, response: {}",
133                            status, body
134                        )));
135                    }
136                }
137                Err(e) => {
138                    last_error = Some(Error::Connection(format!("HTTP request error: {}", e)));
139                }
140            }
141
142            retry_count += 1;
143            if retry_count <= self.config.retry_count {
144                // Index backoff retry
145                tokio::time::sleep(std::time::Duration::from_millis(
146                    100 * 2u64.pow(retry_count - 1),
147                ))
148                .await;
149            }
150        }
151
152        Err(last_error.unwrap_or_else(|| Error::Unknown("Unknown HTTP error".to_string())))
153    }
154
155    async fn close(&self) -> Result<(), Error> {
156        self.connected.store(false, Ordering::SeqCst);
157        let mut guard = self.client.lock().await;
158        *guard = None;
159        Ok(())
160    }
161}
162
163pub(crate) struct HttpOutputBuilder;
164impl OutputBuilder for HttpOutputBuilder {
165    fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error> {
166        if config.is_none() {
167            return Err(Error::Config(
168                "HTTP output configuration is missing".to_string(),
169            ));
170        }
171        let config: HttpOutputConfig = serde_json::from_value(config.clone().unwrap())?;
172
173        Ok(Arc::new(HttpOutput::new(config)?))
174    }
175}
176
177pub fn init() {
178    register_output_builder("http", Arc::new(HttpOutputBuilder));
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use axum::{
185        extract::Json,
186        http::StatusCode,
187        response::IntoResponse,
188        routing::{delete, get, patch, post, put},
189        Router,
190    };
191    use serde_json::json;
192    use std::net::SocketAddr;
193
194    /// Test creating a new HTTP output component
195    #[tokio::test]
196    async fn test_http_output_new() {
197        // Create a basic configuration
198        let config = HttpOutputConfig {
199            url: "http://example.com".to_string(),
200            method: "POST".to_string(),
201            timeout_ms: 5000,
202            retry_count: 3,
203            headers: None,
204        };
205
206        // Create a new HTTP output component
207        let output = HttpOutput::new(config);
208        assert!(output.is_ok(), "Failed to create HTTP output component");
209    }
210
211    /// Test connecting to the HTTP output
212    #[tokio::test]
213    async fn test_http_output_connect() {
214        // Create a basic configuration
215        let config = HttpOutputConfig {
216            url: "http://example.com".to_string(),
217            method: "POST".to_string(),
218            timeout_ms: 5000,
219            retry_count: 3,
220            headers: None,
221        };
222
223        // Create and connect the HTTP output
224        let output = HttpOutput::new(config).unwrap();
225        let result = output.connect().await;
226        assert!(result.is_ok(), "Failed to connect HTTP output");
227        assert!(
228            output.connected.load(Ordering::SeqCst),
229            "Connected flag not set"
230        );
231
232        // Verify client is initialized
233        let client_guard = output.client.lock().await;
234        assert!(client_guard.is_some(), "HTTP client not initialized");
235    }
236
237    /// Test closing the HTTP output
238    #[tokio::test]
239    async fn test_http_output_close() {
240        // Create a basic configuration
241        let config = HttpOutputConfig {
242            url: "http://example.com".to_string(),
243            method: "POST".to_string(),
244            timeout_ms: 5000,
245            retry_count: 3,
246            headers: None,
247        };
248
249        // Create, connect, and close the HTTP output
250        let output = HttpOutput::new(config).unwrap();
251        output.connect().await.unwrap();
252        let result = output.close().await;
253
254        assert!(result.is_ok(), "Failed to close HTTP output");
255        assert!(
256            !output.connected.load(Ordering::SeqCst),
257            "Connected flag not reset"
258        );
259
260        // Verify client is cleared
261        let client_guard = output.client.lock().await;
262        assert!(client_guard.is_none(), "HTTP client not cleared");
263    }
264
265    /// Test writing to HTTP output without connecting first
266    #[tokio::test]
267    async fn test_http_output_write_without_connect() {
268        // Create a basic configuration
269        let config = HttpOutputConfig {
270            url: "http://example.com".to_string(),
271            method: "POST".to_string(),
272            timeout_ms: 5000,
273            retry_count: 3,
274            headers: None,
275        };
276
277        // Create HTTP output without connecting
278        let output = HttpOutput::new(config).unwrap();
279        let msg = MessageBatch::from_string("test message");
280        let result = output.write(&msg).await;
281
282        // Should return connection error
283        assert!(result.is_err(), "Write should fail when not connected");
284        match result {
285            Err(Error::Connection(_)) => {} // Expected error
286            _ => panic!("Expected Connection error"),
287        }
288    }
289
290    /// Test writing with empty message
291    #[tokio::test]
292    async fn test_http_output_write_empty_message() {
293        // Create a basic configuration
294        let config = HttpOutputConfig {
295            url: "http://example.com".to_string(),
296            method: "POST".to_string(),
297            timeout_ms: 5000,
298            retry_count: 3,
299            headers: None,
300        };
301
302        // Create and connect HTTP output
303        let output = HttpOutput::new(config).unwrap();
304        output.connect().await.unwrap();
305
306        // Create empty message
307        let msg = MessageBatch::new_binary(vec![]);
308        let result = output.write(&msg).await;
309
310        // Should succeed with empty message
311        assert!(result.is_ok(), "Write should succeed with empty message");
312    }
313
314    /// Test HTTP output with custom headers
315    #[tokio::test]
316    async fn test_http_output_with_custom_headers() {
317        // Create a configuration with custom headers
318        let mut headers = std::collections::HashMap::new();
319        headers.insert("X-Custom-Header".to_string(), "test-value".to_string());
320        headers.insert("Content-Type".to_string(), "application/xml".to_string());
321
322        let config = HttpOutputConfig {
323            url: "http://example.com".to_string(),
324            method: "POST".to_string(),
325            timeout_ms: 5000,
326            retry_count: 3,
327            headers: Some(headers),
328        };
329
330        // Create and connect HTTP output
331        let output = HttpOutput::new(config).unwrap();
332        output.connect().await.unwrap();
333
334        // We can't easily test the actual headers sent without a mock server,
335        // but we can verify the component initializes correctly
336        assert!(output.connected.load(Ordering::SeqCst));
337    }
338
339    /// Test HTTP output with unsupported method
340    #[tokio::test]
341    async fn test_http_output_unsupported_method() {
342        // Create a configuration with unsupported method
343        let config = HttpOutputConfig {
344            url: "http://example.com".to_string(),
345            method: "CONNECT".to_string(), // Unsupported method
346            timeout_ms: 5000,
347            retry_count: 3,
348            headers: None,
349        };
350
351        // Create and connect HTTP output
352        let output = HttpOutput::new(config).unwrap();
353        output.connect().await.unwrap();
354
355        // Try to write a message
356        let msg = MessageBatch::from_string("test message");
357        let result = output.write(&msg).await;
358
359        // Should return config error
360        assert!(result.is_err(), "Write should fail with unsupported method");
361        match result {
362            Err(Error::Config(_)) => {} // Expected error
363            _ => panic!("Expected Config error, got {:?}", result),
364        }
365    }
366
367    /// Helper function to create a test HTTP server
368    async fn setup_test_server() -> (String, tokio::task::JoinHandle<()>) {
369        // Create a router with endpoints for different HTTP methods
370        let app = Router::new()
371            .route("/get", get(|| async { StatusCode::OK }))
372            .route(
373                "/post",
374                post(|_: Json<serde_json::Value>| async { StatusCode::CREATED }),
375            )
376            .route(
377                "/put",
378                put(|_: Json<serde_json::Value>| async { StatusCode::OK }),
379            )
380            .route("/delete", delete(|| async { StatusCode::NO_CONTENT }))
381            .route(
382                "/patch",
383                patch(|_: Json<serde_json::Value>| async { StatusCode::OK }),
384            )
385            .route(
386                "/error",
387                get(|| async { StatusCode::INTERNAL_SERVER_ERROR.into_response() }),
388            );
389
390        // Find an available port
391        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
392        let addr = listener.local_addr().unwrap();
393        let port = addr.port();
394        drop(listener); // Release the port for the server to use
395
396        // Start the server in a separate task
397        let server_handle = tokio::spawn(async move {
398            let addr = SocketAddr::from(([127, 0, 0, 1], port));
399            axum::Server::bind(&addr)
400                .serve(app.into_make_service())
401                .await
402                .unwrap();
403        });
404
405        // Give the server a moment to start up
406        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
407
408        // Return the base URL and server handle
409        (format!("http://127.0.0.1:{}", port), server_handle)
410    }
411
412    /// Test HTTP output with successful GET request
413    #[tokio::test]
414    async fn test_http_output_get_request() {
415        // Start a test server
416        let (base_url, server_handle) = setup_test_server().await;
417        let url = format!("{}/get", base_url);
418
419        // Create configuration for GET request
420        let config = HttpOutputConfig {
421            url,
422            method: "GET".to_string(),
423            timeout_ms: 5000,
424            retry_count: 3,
425            headers: None,
426        };
427
428        // Create, connect, and write to HTTP output
429        let output = HttpOutput::new(config).unwrap();
430        output.connect().await.unwrap();
431        let msg = MessageBatch::from_string("test message");
432        let result = output.write(&msg).await;
433
434        // Should succeed with 200 OK response
435        assert!(result.is_ok(), "Write should succeed with 200 OK response");
436
437        // Clean up
438        output.close().await.unwrap();
439        server_handle.abort();
440    }
441
442    /// Test HTTP output with successful POST request
443    #[tokio::test]
444    async fn test_http_output_post_request() {
445        // Start a test server
446        let (base_url, server_handle) = setup_test_server().await;
447        let url = format!("{}/post", base_url);
448
449        // Create configuration for POST request
450        let config = HttpOutputConfig {
451            url,
452            method: "POST".to_string(),
453            timeout_ms: 5000,
454            retry_count: 3,
455            headers: None,
456        };
457
458        // Create, connect, and write to HTTP output
459        let output = HttpOutput::new(config).unwrap();
460        output.connect().await.unwrap();
461
462        // Use a valid JSON string for the message
463        let msg = MessageBatch::from_string("{\"test\": \"message\"}");
464
465        // Add a small delay to ensure server is ready
466        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
467
468        let result = output.write(&msg).await;
469
470        // Should succeed with 201 Created response
471        assert!(
472            result.is_ok(),
473            "Write should succeed with 201 Created response"
474        );
475
476        // Clean up
477        output.close().await.unwrap();
478        server_handle.abort();
479    }
480
481    /// Test HTTP output with error response
482    #[tokio::test]
483    async fn test_http_output_error_response() {
484        // Start a test server
485        let (base_url, server_handle) = setup_test_server().await;
486        let url = format!("{}/error", base_url);
487
488        // Create configuration with no retries
489        let config = HttpOutputConfig {
490            url,
491            method: "GET".to_string(),
492            timeout_ms: 5000,
493            retry_count: 0, // No retries
494            headers: None,
495        };
496
497        // Create, connect, and write to HTTP output
498        let output = HttpOutput::new(config).unwrap();
499        output.connect().await.unwrap();
500        let msg = MessageBatch::from_string("test message");
501        let result = output.write(&msg).await;
502
503        // Should fail with either Processing or Connection error
504        assert!(result.is_err(), "Write should fail with error response");
505        match result {
506            Err(Error::Process(_)) => {}    // Expected error type 1
507            Err(Error::Connection(_)) => {} // Also acceptable error type
508            _ => panic!("Expected Processing or Connection error, got {:?}", result),
509        }
510
511        // Clean up
512        output.close().await.unwrap();
513        server_handle.abort();
514    }
515
516    /// Test HTTP output retry mechanism
517    #[tokio::test]
518    async fn test_http_output_retry() {
519        // Start a test server
520        let (base_url, server_handle) = setup_test_server().await;
521
522        // Use a non-existent endpoint to force connection errors
523        let url = format!("{}/nonexistent", base_url);
524
525        // Create configuration with retries
526        let config = HttpOutputConfig {
527            url,
528            method: "GET".to_string(),
529            timeout_ms: 1000,
530            retry_count: 2, // 2 retries
531            headers: None,
532        };
533
534        // Create, connect, and write to HTTP output
535        let output = HttpOutput::new(config).unwrap();
536        output.connect().await.unwrap();
537        let msg = MessageBatch::from_string("test message");
538
539        // Measure time to verify retry delay
540        let start = std::time::Instant::now();
541        let result = output.write(&msg).await;
542        let elapsed = start.elapsed();
543
544        // Should fail after retries
545        assert!(result.is_err(), "Write should fail after retries");
546
547        // Verify that some time has passed for retries (at least 300ms for 2 retries)
548        // First retry: 100ms, Second retry: 200ms
549        assert!(
550            elapsed.as_millis() >= 300,
551            "Retry mechanism not working properly"
552        );
553
554        // Clean up
555        output.close().await.unwrap();
556        server_handle.abort();
557    }
558
559    /// Test HTTP output builder
560    #[tokio::test]
561    async fn test_http_output_builder() {
562        // Create a valid configuration
563        let config = json!({
564            "url": "http://example.com",
565            "method": "POST",
566            "timeout_ms": 5000,
567            "retry_count": 3
568        });
569
570        // Create builder and build output
571        let builder = HttpOutputBuilder;
572        let result = builder.build(&Some(config));
573        assert!(
574            result.is_ok(),
575            "Builder should create output with valid config"
576        );
577
578        // Test with missing configuration
579        let result = builder.build(&None);
580        assert!(result.is_err(), "Builder should fail with missing config");
581
582        // Test with invalid configuration
583        let invalid_config = json!({"invalid": "config"});
584        let result = builder.build(&Some(invalid_config));
585        assert!(result.is_err(), "Builder should fail with invalid config");
586    }
587}