use handlebars::Handlebars;
use jsonschema::{Draft, JSONSchema};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_yaml::Value;
use std::collections::HashMap;
use std::env;
use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use tracing::{debug, error, trace};
use core::future::Future;
use std::pin::Pin;
use super::{Error, Input, Metrics, Output, Processor};
use crate::{InputBatch, OutputBatch};
mod registration;
mod validate;
pub use registration::register_plugin;
pub(crate) use validate::parse_configuration_item;
pub type Callback = fn(Value) -> Pin<Box<dyn Future<Output = Result<ExecutionType, Error>> + Send>>;
#[derive(PartialEq, Eq, Hash, Clone)]
pub enum ItemType {
Input,
InputBatch,
Output,
OutputBatch,
Processor,
Metrics,
}
impl fmt::Display for ItemType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let msg = match self {
ItemType::Input => "input",
ItemType::InputBatch => "input",
ItemType::Output => "output",
ItemType::OutputBatch => "output",
ItemType::Processor => "processors",
ItemType::Metrics => "metrics",
};
write!(f, "{}", msg)
}
}
pub enum ExecutionType {
Input(Box<dyn Input + Send + Sync>),
InputBatch(Box<dyn InputBatch + Send + Sync>),
Output(Box<dyn Output + Send + Sync>),
OutputBatch(Box<dyn OutputBatch + Send + Sync>),
Processor(Box<dyn Processor + Send + Sync>),
Metrics(Box<dyn Metrics + Send + Sync>),
}
static ENV: Lazy<RwLock<HashMap<ItemType, HashMap<String, RegisteredItem>>>> = Lazy::new(|| {
let mut m = HashMap::new();
m.insert(ItemType::Input, HashMap::new());
m.insert(ItemType::InputBatch, HashMap::new());
m.insert(ItemType::Output, HashMap::new());
m.insert(ItemType::OutputBatch, HashMap::new());
m.insert(ItemType::Processor, HashMap::new());
m.insert(ItemType::Metrics, HashMap::new());
RwLock::new(m)
});
#[derive(Clone)]
pub(crate) struct RegisteredItem {
pub creator: Callback,
pub format: ConfigSpec,
}
#[derive(Clone)]
pub(crate) struct ParsedRegisteredItem {
pub creator: Callback,
pub config: Value,
}
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Item {
pub label: Option<String>,
pub retry: Option<crate::RetryPolicy>,
#[serde(flatten)]
pub extra: HashMap<String, Value>,
}
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct MetricsConfig {
pub label: Option<String>,
#[serde(default = "MetricsConfig::default_interval")]
pub interval: u64,
#[serde(default)]
pub collect_system_metrics: bool,
#[serde(flatten)]
pub extra: HashMap<String, Value>,
}
impl MetricsConfig {
fn default_interval() -> u64 {
300
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
pub label: Option<String>,
pub num_threads: Option<usize>,
pub metrics: Option<MetricsConfig>,
#[allow(private_interfaces)]
pub input: Item,
#[allow(private_interfaces)]
pub processors: Vec<Item>,
#[allow(private_interfaces)]
pub output: Item,
}
impl FromStr for Config {
type Err = Error;
fn from_str(conf: &str) -> Result<Self, Self::Err> {
Self::from_env(conf)
}
}
impl Config {
fn parse_with_variables(
conf: &str,
variables: &HashMap<String, String>,
) -> Result<Self, Error> {
let mut handle_bars = Handlebars::new();
handle_bars.set_strict_mode(true);
let populated_config = handle_bars.render_template(conf, variables).map_err(|e| {
Error::ConfigFailedValidation(format!(
"Handlebars template error: {e}. Check your variable interpolations."
))
})?;
let config: Config = serde_yaml::from_str(&populated_config).map_err(|e| {
Error::ConfigFailedValidation(format!(
"YAML parsing error after variable substitution: {e}"
))
})?;
Ok(config)
}
pub fn from_env(conf: &str) -> Result<Self, Error> {
let environment_variables: HashMap<String, String> = env::vars().collect();
Self::parse_with_variables(conf, &environment_variables)
}
pub fn from_variables(conf: &str, variables: HashMap<String, String>) -> Result<Self, Error> {
Self::parse_with_variables(conf, &variables)
}
pub fn from_env_with_overrides(
conf: &str,
overrides: HashMap<String, String>,
) -> Result<Self, Error> {
let mut variables: HashMap<String, String> = env::vars().collect();
variables.extend(overrides);
Self::parse_with_variables(conf, &variables)
}
}
impl Config {
pub async fn validate(self) -> Result<ParsedConfig, Error> {
if self.input.extra.len() > 1 {
error!("input must only contain one entry");
return Err(Error::Validation(
"input must only contain one entry".into(),
));
};
if self.output.extra.len() > 1 {
error!("output must only contain one entry");
return Err(Error::Validation(
"output must only contain one entry".into(),
));
};
let input = match parse_configuration_item(ItemType::Input, &self.input.extra).await {
Ok(i) => i,
Err(e) => match e {
Error::ConfigurationItemNotFound(_) => {
parse_configuration_item(ItemType::InputBatch, &self.input.extra).await?
}
_ => return Err(e),
},
};
let output = match parse_configuration_item(ItemType::Output, &self.output.extra).await {
Ok(i) => i,
Err(e) => match e {
Error::ConfigurationItemNotFound(_) => {
parse_configuration_item(ItemType::OutputBatch, &self.output.extra).await?
}
_ => return Err(e),
},
};
let mut processors = Vec::new();
for p in &self.processors {
let proc = parse_configuration_item(ItemType::Processor, &p.extra).await?;
processors.push(proc);
}
let num_threads = self.num_threads.unwrap_or(num_cpus::get());
trace!("Num threads are {}", num_threads);
let label = self.label.clone();
let metrics = self.metrics.clone();
debug!("configuration is valid");
Ok(ParsedConfig {
label,
input,
input_retry: self.input.retry.clone(),
processors,
num_threads,
metrics,
output,
output_retry: self.output.retry.clone(),
})
}
}
#[derive(Clone)]
pub struct ParsedConfig {
pub label: Option<String>,
pub num_threads: usize,
pub metrics: Option<MetricsConfig>,
#[allow(private_interfaces)]
pub input: ParsedRegisteredItem,
pub input_retry: Option<crate::RetryPolicy>,
#[allow(private_interfaces)]
pub processors: Vec<ParsedRegisteredItem>,
#[allow(private_interfaces)]
pub output: ParsedRegisteredItem,
pub output_retry: Option<crate::RetryPolicy>,
}
pub struct ConfigSpec {
raw_schema: String,
schema: Arc<JSONSchema>,
}
pub type SchemaExport = HashMap<String, HashMap<String, String>>;
pub fn export_schemas() -> Result<SchemaExport, Error> {
crate::modules::register_plugins()?;
let lock = ENV.read().map_err(|_| Error::UnableToSecureLock)?;
let plugin_types = [
(ItemType::Input, "input"),
(ItemType::InputBatch, "input_batch"),
(ItemType::Processor, "processors"),
(ItemType::Metrics, "metrics"),
(ItemType::Output, "output"),
(ItemType::OutputBatch, "output_batch"),
];
let results: SchemaExport = plugin_types
.iter()
.filter_map(|(item_type, export_name)| {
lock.get(item_type).map(|plugins| {
let schemas: HashMap<String, String> = plugins
.iter()
.map(|(name, registered_item)| {
(name.clone(), registered_item.format.raw_schema.clone())
})
.collect();
(export_name.to_string(), schemas)
})
})
.collect();
if results.is_empty() {
debug!("No plugin schemas found in registry");
}
Ok(results)
}
impl std::fmt::Debug for ConfigSpec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConfigSpec")
.field("raw_schema", &self.raw_schema)
.finish()
}
}
impl Clone for ConfigSpec {
fn clone(&self) -> Self {
ConfigSpec {
raw_schema: self.raw_schema.clone(),
schema: Arc::clone(&self.schema),
}
}
}
impl ConfigSpec {
pub fn from_schema(conf: &str) -> Result<Self, Error> {
let v: Value = serde_yaml::from_str(conf)?;
let intermediate = serde_json::to_string(&v)?;
let f: serde_json::Value = serde_json::from_str(&intermediate)?;
let schema: JSONSchema = match JSONSchema::options().with_draft(Draft::Draft7).compile(&f) {
Ok(js) => js,
Err(e) => return Err(Error::InvalidValidationSchema(format!("{e}"))),
};
trace!("json schema is valid");
Ok(ConfigSpec {
raw_schema: conf.into(),
schema: Arc::new(schema),
})
}
pub fn validate(&self, content: &str) -> Result<(), Error> {
let v: Value = serde_yaml::from_str(content)?;
let intermediate = serde_json::to_string(&v)?;
let f: serde_json::Value = serde_json::from_str(&intermediate)?;
let result = self.schema.validate(&f);
if let Err(errors) = result {
let errs: Vec<String> = errors.into_iter().map(|i| format!("{}", i)).collect();
error!(
number_of_failures = errs.len(),
errors = errs.join(" "),
"validation failed"
);
return Err(Error::ConfigFailedValidation(errs.join(" ")));
};
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn load_configuration() {
let input = "input:
stdin:
scanner:
lines: {}
processors:
- label: my_cool_mapping
mapping: |
root.message = this
root.meta.link_count = this.links.length()
output:
label: my_s3_output
aws_s3:
bucket: TODO
path: TODO";
let _v: Config = serde_yaml::from_str(input).unwrap();
}
#[test]
fn validate_configuration_item() {
let input = "scanner:
lines: true";
let schema = "properties:
scanner:
type: object
properties:
lines:
type: boolean";
let conf = ConfigSpec::from_schema(schema).unwrap();
conf.validate(input).unwrap();
}
#[test]
fn expect_schema_failure() {
let input = "scanner:
lines: true";
let schema = "properties:
scanner:
type: object
properties:
lines:
type: number";
let conf = ConfigSpec::from_schema(schema).unwrap();
if let Ok(_) = conf.validate(input) {
panic!("expected error, none received")
}
}
#[test]
fn from_variables_uses_provided_map() {
let conf_str = r#"input:
stdin: {}
processors: []
output:
file:
path: {{OUTPUT_PATH}}"#;
let mut vars = HashMap::new();
vars.insert("OUTPUT_PATH".to_string(), "/test/output.txt".to_string());
let config = Config::from_variables(conf_str, vars).unwrap();
let output_yaml = serde_yaml::to_string(&config.output).unwrap();
assert!(
output_yaml.contains("/test/output.txt"),
"Expected substituted path in output config"
);
}
#[test]
fn from_variables_does_not_use_env() {
std::env::set_var("FIDDLER_TEST_VAR_ISOLATED", "env_value");
let conf_str = r#"input:
stdin: {}
processors: []
output:
file:
path: {{FIDDLER_TEST_VAR_ISOLATED}}"#;
let vars = HashMap::new();
let result = Config::from_variables(conf_str, vars);
assert!(
result.is_err(),
"Expected error when variable not in provided map"
);
std::env::remove_var("FIDDLER_TEST_VAR_ISOLATED");
}
#[test]
fn from_env_reads_environment_variables() {
std::env::set_var("FIDDLER_TEST_ENV_PATH", "/env/path.txt");
let conf_str = r#"input:
stdin: {}
processors: []
output:
file:
path: {{FIDDLER_TEST_ENV_PATH}}"#;
let config = Config::from_env(conf_str).unwrap();
let output_yaml = serde_yaml::to_string(&config.output).unwrap();
assert!(
output_yaml.contains("/env/path.txt"),
"Expected env variable to be substituted"
);
std::env::remove_var("FIDDLER_TEST_ENV_PATH");
}
#[test]
fn from_env_with_overrides_prefers_overrides() {
std::env::set_var("FIDDLER_TEST_OVERRIDE_PATH", "env_value");
let conf_str = r#"input:
stdin: {}
processors: []
output:
file:
path: {{FIDDLER_TEST_OVERRIDE_PATH}}"#;
let mut overrides = HashMap::new();
overrides.insert(
"FIDDLER_TEST_OVERRIDE_PATH".to_string(),
"override_value".to_string(),
);
let config = Config::from_env_with_overrides(conf_str, overrides).unwrap();
let output_yaml = serde_yaml::to_string(&config.output).unwrap();
assert!(
output_yaml.contains("override_value"),
"Expected override to take precedence over env"
);
assert!(
!output_yaml.contains("env_value"),
"Should not contain env value"
);
std::env::remove_var("FIDDLER_TEST_OVERRIDE_PATH");
}
#[test]
fn from_env_with_overrides_uses_env_when_no_override() {
std::env::set_var("FIDDLER_TEST_PATH_A", "/path/a");
std::env::set_var("FIDDLER_TEST_PATH_B", "/path/b");
let conf_str = r#"input:
file:
path: {{FIDDLER_TEST_PATH_A}}
codec: Lines
processors: []
output:
file:
path: {{FIDDLER_TEST_PATH_B}}"#;
let mut overrides = HashMap::new();
overrides.insert("FIDDLER_TEST_PATH_A".to_string(), "/override/a".to_string());
let config = Config::from_env_with_overrides(conf_str, overrides).unwrap();
let input_yaml = serde_yaml::to_string(&config.input).unwrap();
let output_yaml = serde_yaml::to_string(&config.output).unwrap();
assert!(
input_yaml.contains("/override/a"),
"PATH_A should use override"
);
assert!(
output_yaml.contains("/path/b"),
"PATH_B should use env value"
);
std::env::remove_var("FIDDLER_TEST_PATH_A");
std::env::remove_var("FIDDLER_TEST_PATH_B");
}
#[test]
fn missing_variable_in_strict_mode_returns_error() {
let conf_str = r#"input:
stdin: {}
processors: []
output:
file:
path: {{UNDEFINED_VARIABLE_XYZ}}"#;
std::env::remove_var("UNDEFINED_VARIABLE_XYZ");
let result = Config::from_env(conf_str);
assert!(result.is_err(), "Expected error for missing variable");
if let Err(Error::ConfigFailedValidation(msg)) = result {
assert!(
msg.contains("Handlebars template error"),
"Error should mention template error: {}",
msg
);
} else {
panic!("Expected ConfigFailedValidation error");
}
}
#[test]
fn test_item_with_retry() {
let yaml = r#"
retry:
max_retries: 5
initial_wait: "2s"
http:
url: "https://example.com"
"#;
let item: Item = serde_yaml::from_str(yaml).unwrap();
assert!(item.retry.is_some());
let retry = item.retry.unwrap();
assert_eq!(retry.max_retries, 5);
assert_eq!(retry.initial_wait, std::time::Duration::from_secs(2));
assert!(item.extra.contains_key("http"));
assert!(!item.extra.contains_key("retry"));
}
#[test]
fn test_item_without_retry() {
let yaml = r#"
http:
url: "https://example.com"
"#;
let item: Item = serde_yaml::from_str(yaml).unwrap();
assert!(item.retry.is_none());
assert!(item.extra.contains_key("http"));
}
#[test]
fn from_str_uses_environment_variables() {
std::env::set_var("FIDDLER_FROM_STR_TEST", "/fromstr/path");
let conf_str = r#"input:
stdin: {}
processors: []
output:
file:
path: {{FIDDLER_FROM_STR_TEST}}"#;
let config: Config = conf_str.parse().unwrap();
let output_yaml = serde_yaml::to_string(&config.output).unwrap();
assert!(
output_yaml.contains("/fromstr/path"),
"FromStr should use environment variables"
);
std::env::remove_var("FIDDLER_FROM_STR_TEST");
}
}