#![allow(unused_crate_dependencies)]
#![allow(missing_docs)]
#[cfg(all(feature = "http_server", feature = "http_client"))]
use fiddler::Runtime;
#[cfg(all(feature = "http_server", feature = "http_client"))]
use std::time::Duration;
mod dependencies;
#[allow(unused_imports)]
use dependencies::{mock, output};
#[cfg(all(feature = "http_server", feature = "http_client"))]
use std::sync::Once;
#[cfg(all(feature = "http_server", feature = "http_client"))]
static REGISTER: Once = Once::new();
#[cfg(all(feature = "http_server", feature = "http_client"))]
fn register_test_plugins() {
REGISTER.call_once(|| {
mock::register_mock_input().unwrap();
output::register_validate().unwrap();
});
}
#[cfg(all(feature = "http_server", feature = "http_client"))]
fn get_free_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
listener.local_addr().unwrap().port()
}
#[cfg(all(feature = "http_server", feature = "http_client"))]
async fn wait_for_health(port: u16) {
let client = reqwest::Client::new();
let url = format!("http://127.0.0.1:{}/health", port);
for _ in 0..50 {
if client.get(&url).send().await.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("HTTP server on port {} did not become healthy", port);
}
#[cfg(all(feature = "http_server", feature = "http_client"))]
#[tokio::test]
async fn http_output_to_http_server_no_auth() {
register_test_plugins();
let port = get_free_port();
let receiver_config = format!(
r#"input:
http_server:
address: "127.0.0.1"
port: {port}
path: "/ingest"
acknowledgment: true
num_threads: 1
processors: []
output:
validate:
expected:
- '{{"event":"login","user":"alice"}}'
- '{{"event":"logout","user":"bob"}}'"#
);
let receiver = tokio::spawn(async move {
let mut env = Runtime::from_config(&receiver_config).await.unwrap();
env.set_timeout(Some(Duration::from_secs(15))).unwrap();
env.run().await
});
wait_for_health(port).await;
let sender_config = format!(
r#"input:
mock_input:
input:
- '{{"event":"login","user":"alice"}}'
- '{{"event":"logout","user":"bob"}}'
num_threads: 1
processors: []
output:
http:
url: "http://127.0.0.1:{port}/ingest""#
);
let env = Runtime::from_config(&sender_config).await.unwrap();
env.run().await.unwrap();
let result = receiver.await.unwrap();
result.unwrap();
}
#[cfg(all(feature = "http_server", feature = "http_client"))]
#[tokio::test]
async fn http_output_to_http_server_basic_auth() {
register_test_plugins();
let port = get_free_port();
let receiver_config = format!(
r#"input:
http_server:
address: "127.0.0.1"
port: {port}
path: "/ingest"
acknowledgment: true
auth:
type: basic
username: admin
password: secret123
num_threads: 1
processors: []
output:
validate:
expected:
- '{{"event":"login","user":"alice"}}'
- '{{"event":"logout","user":"bob"}}'"#
);
let receiver = tokio::spawn(async move {
let mut env = Runtime::from_config(&receiver_config).await.unwrap();
env.set_timeout(Some(Duration::from_secs(15))).unwrap();
env.run().await
});
wait_for_health(port).await;
let sender_config = format!(
r#"input:
mock_input:
input:
- '{{"event":"login","user":"alice"}}'
- '{{"event":"logout","user":"bob"}}'
num_threads: 1
processors: []
output:
http:
url: "http://127.0.0.1:{port}/ingest"
auth:
type: basic
username: admin
password: secret123"#
);
let env = Runtime::from_config(&sender_config).await.unwrap();
env.run().await.unwrap();
let result = receiver.await.unwrap();
result.unwrap();
}
#[cfg(all(feature = "http_server", feature = "http_client"))]
#[tokio::test]
async fn http_output_to_http_server_bearer_auth() {
register_test_plugins();
let port = get_free_port();
let receiver_config = format!(
r#"input:
http_server:
address: "127.0.0.1"
port: {port}
path: "/ingest"
acknowledgment: true
auth:
type: bearer
token: my-secret-token
num_threads: 1
processors: []
output:
validate:
expected:
- '{{"event":"login","user":"alice"}}'
- '{{"event":"logout","user":"bob"}}'"#
);
let receiver = tokio::spawn(async move {
let mut env = Runtime::from_config(&receiver_config).await.unwrap();
env.set_timeout(Some(Duration::from_secs(15))).unwrap();
env.run().await
});
wait_for_health(port).await;
let sender_config = format!(
r#"input:
mock_input:
input:
- '{{"event":"login","user":"alice"}}'
- '{{"event":"logout","user":"bob"}}'
num_threads: 1
processors: []
output:
http:
url: "http://127.0.0.1:{port}/ingest"
auth:
type: bearer
token: my-secret-token"#
);
let env = Runtime::from_config(&sender_config).await.unwrap();
env.run().await.unwrap();
let result = receiver.await.unwrap();
result.unwrap();
}
#[cfg(all(feature = "http_server", feature = "http_client"))]
#[tokio::test]
async fn http_batch_output_json_array_to_http_server() {
register_test_plugins();
let port = get_free_port();
let receiver_config = format!(
r#"input:
http_server:
address: "127.0.0.1"
port: {port}
path: "/ingest"
acknowledgment: true
num_threads: 1
processors: []
output:
validate:
expected:
- '{{"event":"click","id":1}}'
- '{{"event":"scroll","id":2}}'
- '{{"event":"submit","id":3}}'"#
);
let receiver = tokio::spawn(async move {
let mut env = Runtime::from_config(&receiver_config).await.unwrap();
env.set_timeout(Some(Duration::from_secs(15))).unwrap();
env.run().await
});
wait_for_health(port).await;
let sender_config = format!(
r#"input:
mock_input:
input:
- '{{"event":"click","id":1}}'
- '{{"event":"scroll","id":2}}'
- '{{"event":"submit","id":3}}'
num_threads: 1
processors: []
output:
http:
url: "http://127.0.0.1:{port}/ingest"
batch:
size: 10
duration: "1s"
format: "json_array""#
);
let env = Runtime::from_config(&sender_config).await.unwrap();
env.run().await.unwrap();
let result = receiver.await.unwrap();
result.unwrap();
}
#[cfg(all(feature = "http_server", feature = "http_client"))]
#[tokio::test]
async fn http_batch_output_json_array_bearer_auth() {
register_test_plugins();
let port = get_free_port();
let receiver_config = format!(
r#"input:
http_server:
address: "127.0.0.1"
port: {port}
path: "/ingest"
acknowledgment: true
auth:
type: bearer
token: batch-secret
num_threads: 1
processors: []
output:
validate:
expected:
- '{{"event":"click","id":1}}'
- '{{"event":"scroll","id":2}}'"#
);
let receiver = tokio::spawn(async move {
let mut env = Runtime::from_config(&receiver_config).await.unwrap();
env.set_timeout(Some(Duration::from_secs(15))).unwrap();
env.run().await
});
wait_for_health(port).await;
let sender_config = format!(
r#"input:
mock_input:
input:
- '{{"event":"click","id":1}}'
- '{{"event":"scroll","id":2}}'
num_threads: 1
processors: []
output:
http:
url: "http://127.0.0.1:{port}/ingest"
auth:
type: bearer
token: batch-secret
batch:
size: 10
duration: "1s"
format: "json_array""#
);
let env = Runtime::from_config(&sender_config).await.unwrap();
env.run().await.unwrap();
let result = receiver.await.unwrap();
result.unwrap();
}