use chrono::NaiveTime;
use chrono_tz::Tz;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::time::Duration;
use crate::error::{Error, Result};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Config {
#[serde(default, alias = "global")]
pub system: SystemConfig,
#[serde(default)]
pub pipeline: PipelineConfig,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PipelineConfig {
#[serde(default)]
pub sources: Vec<SourceConfig>,
#[serde(default)]
pub transforms: Vec<TransformConfig>,
#[serde(default)]
pub sinks: Vec<SinkConfig>,
}
impl Config {
pub fn from_yaml(yaml: &str) -> Result<Self> {
let expanded = Self::expand_env_vars(yaml)?;
Ok(serde_yaml::from_str(&expanded)?)
}
fn expand_env_vars(input: &str) -> Result<String> {
use regex::{Captures, Regex};
use std::sync::LazyLock;
static ENV_VAR_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\$\{([a-zA-Z_][a-zA-Z0-9_]*)(?::-([^}]*))?\}").unwrap());
let mut error: Option<String> = None;
let result = ENV_VAR_RE.replace_all(input, |caps: &Captures| {
let var_name = &caps[1];
match std::env::var(var_name) {
Ok(val) => val,
Err(_) => {
if let Some(default) = caps.get(2) {
default.as_str().to_string()
} else {
error = Some(format!("environment variable '{}' not found", var_name));
String::new()
}
}
}
});
if let Some(err) = error {
return Err(Error::config(format!(
"Failed to expand environment variables: {}",
err
)));
}
Ok(result.into_owned())
}
pub fn from_file(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let mut config = Config::default();
if path.is_dir() {
let mut entries: Vec<_> = std::fs::read_dir(path)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.extension()
.map(|ext| ext == "yaml" || ext == "yml")
.unwrap_or(false)
})
.collect();
entries.sort();
if entries.is_empty() {
return Err(Error::config(format!(
"No .yaml or .yml files found in directory: {:?}",
path
)));
}
for entry in entries {
let content = std::fs::read_to_string(&entry)?;
let partial = Self::from_yaml(&content)?;
config.merge(partial);
}
} else {
let content = std::fs::read_to_string(path)?;
config = Self::from_yaml(&content)?;
}
config.normalize()?;
config.validate()?;
Ok(config)
}
pub fn merge(&mut self, other: Config) {
if other.system.output_buffer_size.is_some() {
self.system.output_buffer_size = other.system.output_buffer_size;
}
if other.system.channel_size.is_some() {
self.system.channel_size = other.system.channel_size;
}
if other.system.shutdown_timeout.is_some() {
self.system.shutdown_timeout = other.system.shutdown_timeout;
}
if other.system.metrics_log_interval.is_some() {
self.system.metrics_log_interval = other.system.metrics_log_interval;
}
self.system.sinks.merge(&other.system.sinks);
self.system.notify.merge(&other.system.notify);
self.pipeline.sources.extend(other.pipeline.sources);
self.pipeline.transforms.extend(other.pipeline.transforms);
self.pipeline.sinks.extend(other.pipeline.sinks);
}
pub fn normalize(&mut self) -> Result<()> {
let mut transform_index = HashMap::with_capacity(self.pipeline.transforms.len());
for (idx, transform) in self.pipeline.transforms.iter().enumerate() {
transform_index.insert(transform.id.clone(), idx);
}
let mut edges = Vec::new();
for transform in &self.pipeline.transforms {
for output in &transform.outputs {
if transform_index.contains_key(output) {
edges.push((transform.id.clone(), output.clone()));
}
}
}
for (upstream, downstream) in edges {
if upstream == downstream {
continue;
}
if let Some(&idx) = transform_index.get(&downstream) {
let inputs = &mut self.pipeline.transforms[idx].inputs;
if !inputs.iter().any(|id| id == &upstream) {
inputs.push(upstream);
}
}
}
Ok(())
}
pub fn input_edges(&self) -> Vec<(&str, &str)> {
let mut edges = Vec::new();
for transform in &self.pipeline.transforms {
for input in &transform.inputs {
edges.push((input.as_str(), transform.id.as_str()));
}
for output in &transform.outputs {
edges.push((transform.id.as_str(), output.as_str()));
}
}
edges
}
pub fn validate(&self) -> Result<()> {
let mut normalized = self.clone();
normalized.normalize()?;
normalized.validate_inner()
}
fn validate_inner(&self) -> Result<()> {
if matches!(self.system.output_buffer_size, Some(0)) {
return Err(Error::config(
"system.output_buffer_size must be greater than 0",
));
}
if matches!(self.system.channel_size, Some(0)) {
return Err(Error::config("system.channel_size must be greater than 0"));
}
if let Some(silence) = &self.system.notify.silence
&& let Some(window) = silence.window
&& window.is_zero()
{
return Err(Error::config(
"system.notify.silence.window must be greater than 0",
));
}
if let Some(active_window) = &self.system.notify.active_window {
validate_notify_active_window("system.notify.active_window", active_window)?;
}
let mut ids: HashSet<&str> = HashSet::new();
let mut source_ids: HashSet<&str> = HashSet::new();
let mut sink_ids: HashSet<&str> = HashSet::new();
let system_source_ids: HashSet<&str> = crate::source::system::VALID_SYSTEM_SOURCE_IDS
.iter()
.copied()
.collect();
let system_sink_ids: HashSet<&str> = crate::sink::system::VALID_SYSTEM_SINK_IDS
.iter()
.copied()
.collect();
for source in &self.pipeline.sources {
if matches!(source.output_buffer_size, Some(0)) {
return Err(Error::config(format!(
"Source '{}' has output_buffer_size of 0 (must be greater than 0)",
source.id
)));
}
if !ids.insert(source.id.as_str()) {
return Err(Error::config(format!("Duplicate source ID: {}", source.id)));
}
source_ids.insert(source.id.as_str());
if system_source_ids.contains(source.id.as_str()) {
return Err(Error::config(format!(
"Source '{}' is reserved for system use",
source.id
)));
}
}
let mut transform_ids: HashSet<&str> = HashSet::new();
for transform in &self.pipeline.transforms {
if matches!(transform.output_buffer_size, Some(0)) {
return Err(Error::config(format!(
"Transform '{}' has output_buffer_size of 0 (must be greater than 0)",
transform.id
)));
}
if !ids.insert(transform.id.as_str()) {
return Err(Error::config(format!(
"Duplicate transform ID: {}",
transform.id
)));
}
transform_ids.insert(transform.id.as_str());
if transform.inputs.is_empty() {
return Err(Error::config(format!(
"Transform '{}' has no inputs defined",
transform.id
)));
}
if transform.outputs.is_empty() {
return Err(Error::config(format!(
"Transform '{}' has no outputs defined",
transform.id
)));
}
}
for sink in &self.pipeline.sinks {
if !ids.insert(sink.id.as_str()) {
return Err(Error::config(format!("Duplicate sink ID: {}", sink.id)));
}
sink_ids.insert(sink.id.as_str());
if system_sink_ids.contains(sink.id.as_str()) {
return Err(Error::config(format!(
"Sink '{}' is reserved for system use",
sink.id
)));
}
}
let mut used_source_ids: HashSet<&str> = HashSet::new();
let mut used_sink_ids: HashSet<&str> = HashSet::new();
let mut used_transform_ids: HashSet<&str> = HashSet::new();
for transform in &self.pipeline.transforms {
for input_str in &transform.inputs {
let input = input_str.as_str();
let is_source = source_ids.contains(input) || system_source_ids.contains(input);
let is_transform = transform_ids.contains(input);
if !is_source && !is_transform {
return Err(Error::config(format!(
"Transform '{}' references unknown input '{}' (must be source or transform)",
transform.id, input
)));
}
if input == transform.id {
return Err(Error::config(format!(
"Transform '{}' cannot reference itself as input",
transform.id
)));
}
if source_ids.contains(input) {
used_source_ids.insert(input);
}
if is_transform {
used_transform_ids.insert(input);
}
}
for output_str in &transform.outputs {
let output = output_str.as_str();
let is_sink = sink_ids.contains(output) || system_sink_ids.contains(output);
let is_transform = transform_ids.contains(output);
if !is_sink && !is_transform {
return Err(Error::config(format!(
"Transform '{}' references unknown output '{}' (must be sink or transform)",
transform.id, output
)));
}
if output == transform.id {
return Err(Error::config(format!(
"Transform '{}' cannot reference itself as output",
transform.id
)));
}
if sink_ids.contains(output) {
used_sink_ids.insert(output);
}
if is_sink {
used_transform_ids.insert(transform.id.as_str());
}
if is_transform {
used_transform_ids.insert(output);
}
}
}
for source in &self.pipeline.sources {
if !used_source_ids.contains(source.id.as_str()) {
return Err(Error::config(format!(
"Source '{}' is not used by any transform",
source.id
)));
}
}
for sink in &self.pipeline.sinks {
if !used_sink_ids.contains(sink.id.as_str()) {
return Err(Error::config(format!(
"Sink '{}' is not used by any transform",
sink.id
)));
}
}
for transform in &self.pipeline.transforms {
if !used_transform_ids.contains(transform.id.as_str()) {
return Err(Error::config(format!(
"Transform '{}' is not used by any transform or sink",
transform.id
)));
}
}
self.validate_no_cycles()?;
Ok(())
}
fn validate_no_cycles(&self) -> Result<()> {
let mut graph: HashMap<&str, Vec<&str>> = HashMap::new();
for source in &self.pipeline.sources {
graph.entry(source.id.as_str()).or_default();
}
for transform in &self.pipeline.transforms {
graph.entry(transform.id.as_str()).or_default();
}
for sink in &self.pipeline.sinks {
graph.entry(sink.id.as_str()).or_default();
}
for (from, to) in self.input_edges() {
graph.entry(from).or_default().push(to);
}
let mut state: HashMap<&str, u8> = HashMap::new();
let mut nodes: Vec<&str> = graph.keys().copied().collect();
nodes.sort_unstable();
for node in nodes {
if let Some(cycle_node) = Self::dfs_detect_cycle(node, &graph, &mut state) {
return Err(Error::config(format!(
"Cycle detected in pipeline at node '{}'",
cycle_node
)));
}
}
Ok(())
}
fn dfs_detect_cycle<'a>(
node: &'a str,
graph: &HashMap<&str, Vec<&'a str>>,
state: &mut HashMap<&'a str, u8>,
) -> Option<&'a str> {
match state.get(node) {
Some(2) => return None, Some(1) => return Some(node), _ => {}
}
state.insert(node, 1);
if let Some(neighbors) = graph.get(node) {
for &neighbor in neighbors {
if let Some(cycle_node) = Self::dfs_detect_cycle(neighbor, graph, state) {
return Some(cycle_node);
}
}
}
state.insert(node, 2); None
}
}
const DEFAULT_OUTPUT_BUFFER_SIZE: usize = 1024;
const DEFAULT_SYSTEM_CHANNEL_SIZE: usize = 256;
const DEFAULT_SHUTDOWN_TIMEOUT_SECS: u64 = 5;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_buffer_size: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub channel_size: Option<usize>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde::option"
)]
pub shutdown_timeout: Option<Duration>,
#[serde(
default,
skip_serializing_if = "Option::is_none",
with = "humantime_serde::option"
)]
pub metrics_log_interval: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data_dir: Option<String>,
#[serde(default)]
pub sinks: crate::sink::system::SystemSinksConfig,
#[serde(default)]
pub notify: SystemNotifyConfig,
}
impl SystemConfig {
pub fn output_buffer_size(&self) -> usize {
self.output_buffer_size
.unwrap_or(DEFAULT_OUTPUT_BUFFER_SIZE)
}
pub fn channel_size(&self) -> usize {
self.channel_size.unwrap_or(DEFAULT_SYSTEM_CHANNEL_SIZE)
}
pub fn shutdown_timeout(&self) -> Duration {
self.shutdown_timeout
.unwrap_or(Duration::from_secs(DEFAULT_SHUTDOWN_TIMEOUT_SECS))
}
pub fn metrics_log_interval(&self) -> Duration {
self.metrics_log_interval.unwrap_or(Duration::from_secs(60))
}
pub fn data_dir(&self) -> String {
self.data_dir
.clone()
.unwrap_or_else(|| "./data".to_string())
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemNotifyConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub silence: Option<SystemNotifySilenceConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active_window: Option<SystemNotifyActiveWindowConfig>,
}
impl SystemNotifyConfig {
pub fn merge(&mut self, other: &SystemNotifyConfig) {
if let Some(other_silence) = &other.silence {
if let Some(current) = &mut self.silence {
current.merge(other_silence);
} else {
self.silence = Some(other_silence.clone());
}
}
if let Some(other_window) = &other.active_window {
if let Some(current) = &mut self.active_window {
current.merge(other_window);
} else {
self.active_window = Some(other_window.clone());
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum NotifyActiveWindowDay {
Mon,
Tue,
Wed,
Thu,
Fri,
Sat,
Sun,
}
impl NotifyActiveWindowDay {
pub fn weekday(self) -> chrono::Weekday {
match self {
NotifyActiveWindowDay::Mon => chrono::Weekday::Mon,
NotifyActiveWindowDay::Tue => chrono::Weekday::Tue,
NotifyActiveWindowDay::Wed => chrono::Weekday::Wed,
NotifyActiveWindowDay::Thu => chrono::Weekday::Thu,
NotifyActiveWindowDay::Fri => chrono::Weekday::Fri,
NotifyActiveWindowDay::Sat => chrono::Weekday::Sat,
NotifyActiveWindowDay::Sun => chrono::Weekday::Sun,
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemNotifyActiveWindowConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub start: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub end: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timezone: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub days: Option<Vec<NotifyActiveWindowDay>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bypass_severity: Option<crate::common::types::Severity>,
}
impl SystemNotifyActiveWindowConfig {
fn merge(&mut self, other: &SystemNotifyActiveWindowConfig) {
if other.start.is_some() {
self.start = other.start.clone();
}
if other.end.is_some() {
self.end = other.end.clone();
}
if other.timezone.is_some() {
self.timezone = other.timezone.clone();
}
if other.days.is_some() {
self.days = other.days.clone();
}
if other.bypass_severity.is_some() {
self.bypass_severity = other.bypass_severity;
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SystemNotifySilenceConfig {
#[serde(default, with = "humantime_serde::option")]
pub window: Option<Duration>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
}
impl SystemNotifySilenceConfig {
fn merge(&mut self, other: &SystemNotifySilenceConfig) {
if other.window.is_some() {
self.window = other.window;
}
if other.key.is_some() {
self.key = other.key.clone();
}
}
}
fn validate_notify_active_window(
label: &str,
config: &SystemNotifyActiveWindowConfig,
) -> Result<()> {
let start = config.start.as_deref().ok_or_else(|| {
Error::config(format!(
"{}.start is required when active_window is set",
label
))
})?;
let end = config.end.as_deref().ok_or_else(|| {
Error::config(format!(
"{}.end is required when active_window is set",
label
))
})?;
let start_time = parse_notify_active_window_time(&format!("{}.start", label), start)?;
let end_time = parse_notify_active_window_time(&format!("{}.end", label), end)?;
if start_time == end_time {
return Err(Error::config(format!(
"{}.start and {}.end must not be equal",
label, label
)));
}
if let Some(tz) = config.timezone.as_deref() {
if tz.trim().is_empty() {
return Err(Error::config(format!(
"{}.timezone must be non-empty",
label
)));
}
tz.parse::<Tz>().map_err(|e| {
Error::config(format!(
"{}.timezone has invalid value '{}': {}",
label, tz, e
))
})?;
}
if let Some(days) = config.days.as_ref()
&& days.is_empty()
{
return Err(Error::config(format!("{}.days must not be empty", label)));
}
Ok(())
}
fn parse_notify_active_window_time(label: &str, value: &str) -> Result<NaiveTime> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(Error::config(format!("{} must be non-empty", label)));
}
NaiveTime::parse_from_str(trimmed, "%H:%M").map_err(|e| {
Error::config(format!(
"{} must be in HH:MM format (got '{}'): {}",
label, value, e
))
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SourceConfig {
pub id: String,
#[serde(rename = "type")]
pub source_type: String,
#[serde(default)]
pub output_buffer_size: Option<usize>,
#[serde(default)]
pub config: serde_yaml::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransformConfig {
pub id: String,
#[serde(default)]
pub inputs: Vec<String>,
#[serde(default)]
pub outputs: Vec<String>,
#[serde(default)]
pub steps: Vec<StepConfig>,
#[serde(default)]
pub output_buffer_size: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepConfig {
#[serde(rename = "type")]
pub step_type: String,
#[serde(default)]
pub config: serde_yaml::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SinkConfig {
pub id: String,
#[serde(rename = "type")]
pub sink_type: String,
#[serde(default)]
pub config: serde_yaml::Value,
}
#[cfg(test)]
mod tests;