use crate::{
events::EventConfig, priority::PriorityWeights, rate_limit::ThrottleConfig,
retry::RetryStrategy,
};
#[cfg(feature = "webhooks")]
use crate::webhooks::WebhookConfig;
#[cfg(feature = "alerting")]
use crate::alerting::AlertingConfig;
#[cfg(feature = "metrics")]
use crate::metrics::MetricsConfig;
#[cfg(any(
feature = "streaming",
feature = "kafka",
feature = "google-pubsub",
feature = "kinesis"
))]
use crate::streaming::StreamConfig;
use chrono::Duration;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf, time::Duration as StdDuration};
mod duration_secs {
use serde::{Deserialize, Deserializer, Serializer};
use std::time::Duration;
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let secs = duration.as_secs();
if secs == 0 {
serializer.serialize_str("0s")
} else if secs % 3600 == 0 {
serializer.serialize_str(&format!("{}h", secs / 3600))
} else if secs % 60 == 0 {
serializer.serialize_str(&format!("{}m", secs / 60))
} else {
serializer.serialize_str(&format!("{}s", secs))
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;
let s = String::deserialize(deserializer)?;
parse_duration(&s).map_err(D::Error::custom)
}
fn parse_duration(s: &str) -> Result<Duration, String> {
let s = s.trim();
if let Ok(secs) = s.parse::<u64>() {
return Ok(Duration::from_secs(secs));
}
if s.len() < 2 {
return Err(format!("Invalid duration format: {}", s));
}
let (num_str, suffix) = s.split_at(s.len() - 1);
let num: u64 = num_str
.parse()
.map_err(|_| format!("Invalid number in duration: {}", num_str))?;
match suffix {
"s" => Ok(Duration::from_secs(num)),
"m" => Ok(Duration::from_secs(num * 60)),
"h" => Ok(Duration::from_secs(num * 3600)),
"d" => Ok(Duration::from_secs(num * 86400)),
_ => Err(format!(
"Invalid duration suffix: {}. Use s, m, h, or d",
suffix
)),
}
}
}
mod chrono_duration_days {
use chrono::Duration;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let days = duration.num_days();
if days == 0 {
serializer.serialize_str("0d")
} else {
serializer.serialize_str(&format!("{}d", days))
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;
let s = String::deserialize(deserializer)?;
parse_chrono_duration(&s).map_err(D::Error::custom)
}
pub fn parse_chrono_duration(s: &str) -> Result<Duration, String> {
let s = s.trim();
if let Ok(days) = s.parse::<i64>() {
return Ok(Duration::days(days));
}
if s.len() < 2 {
return Err(format!("Invalid duration format: {}", s));
}
let (num_str, suffix) = s.split_at(s.len() - 1);
let num: i64 = num_str
.parse()
.map_err(|_| format!("Invalid number in duration: {}", num_str))?;
match suffix {
"d" => Ok(Duration::days(num)),
_ => Err(format!(
"Invalid duration suffix: {}. Use d for days",
suffix
)),
}
}
}
mod chrono_duration_days_option {
use chrono::Duration;
use serde::{Deserialize, Deserializer, Serializer};
pub fn serialize<S>(duration: &Option<Duration>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match duration {
Some(d) => super::chrono_duration_days::serialize(d, serializer),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<String> = Option::deserialize(deserializer)?;
match opt {
Some(s) => super::chrono_duration_days::parse_chrono_duration(&s)
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct HammerworkConfig {
pub database: DatabaseConfig,
pub worker: WorkerConfig,
pub events: EventConfig,
#[cfg(feature = "webhooks")]
pub webhooks: WebhookConfigs,
#[cfg(any(
feature = "streaming",
feature = "kafka",
feature = "google-pubsub",
feature = "kinesis"
))]
pub streaming: StreamingConfigs,
#[cfg(feature = "alerting")]
pub alerting: AlertingConfig,
#[cfg(feature = "metrics")]
pub metrics: MetricsConfig,
pub archive: ArchiveConfig,
pub rate_limiting: RateLimitingConfig,
pub logging: LoggingConfig,
}
impl HammerworkConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_database_url(mut self, url: &str) -> Self {
self.database.url = url.to_string();
self
}
pub fn with_database_pool_size(mut self, size: u32) -> Self {
self.database.pool_size = size;
self
}
pub fn with_worker_pool_size(mut self, size: usize) -> Self {
self.worker.pool_size = size;
self
}
pub fn with_job_timeout(mut self, timeout: StdDuration) -> Self {
self.worker.job_timeout = timeout;
self
}
pub fn with_events_enabled(mut self, enabled: bool) -> Self {
if enabled {
self.events.max_buffer_size = 10_000;
} else {
self.events.max_buffer_size = 0;
}
self
}
pub fn from_file(path: &str) -> crate::Result<Self> {
let content = std::fs::read_to_string(path)?;
let config: Self = toml::from_str(&content)?;
Ok(config)
}
pub fn save_to_file(&self, path: &str) -> crate::Result<()> {
let content = toml::to_string_pretty(self)?;
std::fs::write(path, content)?;
Ok(())
}
pub fn from_env() -> crate::Result<Self> {
let mut config = Self::default();
if let Ok(url) = std::env::var("HAMMERWORK_DATABASE_URL") {
config.database.url = url;
}
if let Ok(pool_size) = std::env::var("HAMMERWORK_DATABASE_POOL_SIZE") {
config.database.pool_size = pool_size.parse().unwrap_or(config.database.pool_size);
}
if let Ok(pool_size) = std::env::var("HAMMERWORK_WORKER_POOL_SIZE") {
config.worker.pool_size = pool_size.parse().unwrap_or(config.worker.pool_size);
}
if let Ok(timeout) = std::env::var("HAMMERWORK_JOB_TIMEOUT_SECONDS") {
if let Ok(seconds) = timeout.parse::<u64>() {
config.worker.job_timeout = StdDuration::from_secs(seconds);
}
}
if let Ok(buffer_size) = std::env::var("HAMMERWORK_EVENT_BUFFER_SIZE") {
config.events.max_buffer_size =
buffer_size.parse().unwrap_or(config.events.max_buffer_size);
}
Ok(config)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseConfig {
pub url: String,
pub pool_size: u32,
pub connection_timeout_secs: u64,
pub auto_migrate: bool,
pub create_tables: bool,
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
url: "postgresql://localhost/hammerwork".to_string(),
pool_size: 10,
connection_timeout_secs: 30,
auto_migrate: false,
create_tables: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
pub pool_size: usize,
#[serde(with = "duration_secs")]
pub polling_interval: StdDuration,
#[serde(with = "duration_secs")]
pub job_timeout: StdDuration,
pub priority_weights: PriorityWeights,
pub retry_strategy: RetryStrategy,
pub autoscaling_enabled: bool,
pub min_workers: usize,
pub max_workers: usize,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
pool_size: 4,
polling_interval: StdDuration::from_millis(500),
job_timeout: StdDuration::from_secs(300), priority_weights: PriorityWeights::default(),
retry_strategy: RetryStrategy::exponential(
StdDuration::from_secs(1),
2.0,
Some(StdDuration::from_secs(300)),
),
autoscaling_enabled: false,
min_workers: 1,
max_workers: 16,
}
}
}
#[cfg(feature = "webhooks")]
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WebhookConfigs {
pub webhooks: Vec<WebhookConfig>,
pub global_settings: WebhookGlobalSettings,
}
#[cfg(feature = "webhooks")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookGlobalSettings {
pub max_concurrent_deliveries: usize,
pub max_response_body_size: usize,
pub log_deliveries: bool,
pub user_agent: String,
}
#[cfg(feature = "webhooks")]
impl Default for WebhookGlobalSettings {
fn default() -> Self {
Self {
max_concurrent_deliveries: 100,
max_response_body_size: 64 * 1024, log_deliveries: true,
user_agent: format!("hammerwork-webhooks/{}", env!("CARGO_PKG_VERSION")),
}
}
}
#[cfg(any(
feature = "streaming",
feature = "kafka",
feature = "google-pubsub",
feature = "kinesis"
))]
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StreamingConfigs {
pub streams: Vec<StreamConfig>,
pub global_settings: StreamingGlobalSettings,
}
#[cfg(not(feature = "webhooks"))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SimpleEventFilter {
pub event_types: Vec<String>,
pub queue_names: Vec<String>,
pub include_payload: bool,
}
#[cfg(not(feature = "webhooks"))]
impl Default for SimpleEventFilter {
fn default() -> Self {
Self {
event_types: vec!["completed".to_string(), "failed".to_string()],
queue_names: Vec::new(),
include_payload: false,
}
}
}
#[cfg(any(
feature = "streaming",
feature = "kafka",
feature = "google-pubsub",
feature = "kinesis"
))]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingGlobalSettings {
pub max_concurrent_processors: usize,
pub log_operations: bool,
pub global_flush_interval_secs: u64,
}
#[cfg(any(
feature = "streaming",
feature = "kafka",
feature = "google-pubsub",
feature = "kinesis"
))]
impl Default for StreamingGlobalSettings {
fn default() -> Self {
Self {
max_concurrent_processors: 50,
log_operations: true,
global_flush_interval_secs: 10,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchiveConfig {
pub enabled: bool,
pub archive_directory: PathBuf,
pub compression_level: u32,
#[serde(with = "chrono_duration_days")]
pub archive_after: Duration,
#[serde(with = "chrono_duration_days_option")]
pub delete_after: Option<Duration>,
pub max_file_size_bytes: u64,
pub include_payloads: bool,
}
impl Default for ArchiveConfig {
fn default() -> Self {
Self {
enabled: false,
archive_directory: PathBuf::from("./archives"),
compression_level: 6,
archive_after: Duration::days(30),
delete_after: Some(Duration::days(365)),
max_file_size_bytes: 100 * 1024 * 1024, include_payloads: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RateLimitingConfig {
pub enabled: bool,
pub default_throttle: ThrottleConfig,
pub queue_throttles: HashMap<String, ThrottleConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoggingConfig {
pub level: String,
pub json_format: bool,
pub include_location: bool,
pub enable_tracing: bool,
pub tracing_endpoint: Option<String>,
pub service_name: String,
}
impl Default for LoggingConfig {
fn default() -> Self {
Self {
level: "info".to_string(),
json_format: false,
include_location: false,
enable_tracing: false,
tracing_endpoint: None,
service_name: "hammerwork".to_string(),
}
}
}
impl HammerworkConfig {
pub fn development() -> Self {
Self {
database: DatabaseConfig {
url: "postgresql://localhost/hammerwork_dev".to_string(),
pool_size: 5,
auto_migrate: true,
..Default::default()
},
worker: WorkerConfig {
pool_size: 2,
polling_interval: StdDuration::from_millis(100),
..Default::default()
},
events: EventConfig {
max_buffer_size: 1000,
log_events: true,
..Default::default()
},
logging: LoggingConfig {
level: "debug".to_string(),
include_location: true,
..Default::default()
},
..Default::default()
}
}
pub fn production() -> Self {
Self {
database: DatabaseConfig {
pool_size: 20,
connection_timeout_secs: 60,
auto_migrate: false,
..Default::default()
},
worker: WorkerConfig {
pool_size: 8,
autoscaling_enabled: true,
min_workers: 4,
max_workers: 32,
..Default::default()
},
events: EventConfig {
max_buffer_size: 50_000,
log_events: false,
..Default::default()
},
archive: ArchiveConfig {
enabled: true,
compression_level: 9,
..Default::default()
},
rate_limiting: RateLimitingConfig {
enabled: true,
..Default::default()
},
logging: LoggingConfig {
level: "info".to_string(),
json_format: true,
enable_tracing: true,
..Default::default()
},
..Default::default()
}
}
#[cfg(feature = "webhooks")]
pub fn add_webhook(mut self, webhook: WebhookConfig) -> Self {
self.webhooks.webhooks.push(webhook);
self
}
#[cfg(any(
feature = "streaming",
feature = "kafka",
feature = "google-pubsub",
feature = "kinesis"
))]
pub fn add_stream(mut self, stream: StreamConfig) -> Self {
self.streaming.streams.push(stream);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(any(
feature = "streaming",
feature = "kafka",
feature = "google-pubsub",
feature = "kinesis"
))]
use crate::streaming::StreamBackend;
use tempfile::tempdir;
#[test]
fn test_config_creation() {
let config = HammerworkConfig::new()
.with_database_url("postgresql://localhost/test")
.with_worker_pool_size(8)
.with_job_timeout(StdDuration::from_secs(600));
assert_eq!(config.database.url, "postgresql://localhost/test");
assert_eq!(config.worker.pool_size, 8);
assert_eq!(config.worker.job_timeout, StdDuration::from_secs(600));
}
#[test]
fn test_development_config() {
let config = HammerworkConfig::development();
assert_eq!(config.database.url, "postgresql://localhost/hammerwork_dev");
assert_eq!(config.worker.pool_size, 2);
assert!(config.database.auto_migrate);
assert_eq!(config.logging.level, "debug");
}
#[test]
fn test_production_config() {
let config = HammerworkConfig::production();
assert_eq!(config.database.pool_size, 20);
assert_eq!(config.worker.pool_size, 8);
assert!(config.worker.autoscaling_enabled);
assert!(config.archive.enabled);
assert!(config.rate_limiting.enabled);
assert!(config.logging.json_format);
}
#[test]
fn test_config_file_operations() {
let dir = tempdir().unwrap();
let config_path = dir.path().join("hammerwork.toml");
let config = HammerworkConfig::new()
.with_database_url("mysql://localhost/test")
.with_worker_pool_size(6);
println!("Testing TOML serialization...");
let toml_result = toml::to_string_pretty(&config);
println!("TOML result: {:?}", toml_result);
config.save_to_file(config_path.to_str().unwrap()).unwrap();
let loaded_config = HammerworkConfig::from_file(config_path.to_str().unwrap()).unwrap();
assert_eq!(loaded_config.database.url, "mysql://localhost/test");
assert_eq!(loaded_config.worker.pool_size, 6);
}
#[test]
fn test_env_config() {
unsafe {
std::env::set_var("HAMMERWORK_DATABASE_URL", "postgresql://env/test");
std::env::set_var("HAMMERWORK_WORKER_POOL_SIZE", "12");
std::env::set_var("HAMMERWORK_JOB_TIMEOUT_SECONDS", "900");
}
let config = HammerworkConfig::from_env().unwrap();
assert_eq!(config.database.url, "postgresql://env/test");
assert_eq!(config.worker.pool_size, 12);
assert_eq!(config.worker.job_timeout, StdDuration::from_secs(900));
unsafe {
std::env::remove_var("HAMMERWORK_DATABASE_URL");
std::env::remove_var("HAMMERWORK_WORKER_POOL_SIZE");
std::env::remove_var("HAMMERWORK_JOB_TIMEOUT_SECONDS");
}
}
#[test]
fn test_duration_serialization() {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let config_path = dir.path().join("duration_test.toml");
let mut config = HammerworkConfig::new();
config.worker.polling_interval = StdDuration::from_secs(30); config.worker.job_timeout = StdDuration::from_secs(300);
config.save_to_file(config_path.to_str().unwrap()).unwrap();
let toml_content = std::fs::read_to_string(&config_path).unwrap();
assert!(toml_content.contains("polling_interval = \"30s\""));
assert!(toml_content.contains("job_timeout = \"5m\""));
let loaded_config = HammerworkConfig::from_file(config_path.to_str().unwrap()).unwrap();
assert_eq!(
loaded_config.worker.polling_interval,
StdDuration::from_secs(30)
);
assert_eq!(
loaded_config.worker.job_timeout,
StdDuration::from_secs(300)
);
let test_durations = [
("30", StdDuration::from_secs(30)),
("30s", StdDuration::from_secs(30)),
("5m", StdDuration::from_secs(300)),
("2h", StdDuration::from_secs(7200)),
("1d", StdDuration::from_secs(86400)),
];
for (duration_str, expected) in test_durations.iter() {
let toml_content = format!(
r#"
[database]
url = "postgresql://localhost/test"
pool_size = 10
connection_timeout_secs = 30
auto_migrate = false
create_tables = true
[worker]
pool_size = 4
polling_interval = "{}"
job_timeout = "5m"
autoscaling_enabled = false
min_workers = 1
max_workers = 10
[worker.priority_weights]
strict_priority = false
fairness_factor = 0.1
[worker.priority_weights.weights]
Background = 1
Low = 2
Normal = 5
High = 10
Critical = 20
[worker.retry_strategy]
type = "Exponential"
base_ms = 1000
multiplier = 2.0
max_delay_ms = 60000
[events]
max_buffer_size = 1000
include_payload_default = false
max_payload_size_bytes = 65536
log_events = false
[webhooks]
webhooks = []
[webhooks.global_settings]
max_concurrent_deliveries = 100
max_response_body_size = 65536
log_deliveries = true
user_agent = "hammerwork-webhooks/1.13.0"
[streaming]
streams = []
[streaming.global_settings]
max_concurrent_processors = 50
log_operations = true
global_flush_interval_secs = 10
[alerting]
targets = []
enabled = true
[alerting.cooldown_period]
secs = 300
nanos = 0
[alerting.custom_thresholds]
[metrics]
registry_name = "hammerwork"
collect_histograms = true
custom_gauges = []
custom_histograms = []
update_interval = 15
[metrics.custom_labels]
[archive]
enabled = false
archive_directory = "./archives"
compression_level = 6
archive_after = "30d"
delete_after = "365d"
max_file_size_bytes = 104857600
include_payloads = true
[rate_limiting]
enabled = false
[rate_limiting.default_throttle]
enabled = true
[rate_limiting.queue_throttles]
[logging]
level = "info"
json_format = false
include_location = false
enable_tracing = false
service_name = "hammerwork"
"#,
duration_str
);
let config: HammerworkConfig = toml::from_str(&toml_content).unwrap();
assert_eq!(
config.worker.polling_interval, *expected,
"Failed to parse duration: {}",
duration_str
);
}
}
#[cfg(feature = "webhooks")]
#[test]
fn test_webhook_config() {
let webhook = WebhookConfig {
name: "Test Webhook".to_string(),
url: "https://api.example.com/webhook".to_string(),
..Default::default()
};
let config = HammerworkConfig::new().add_webhook(webhook);
assert_eq!(config.webhooks.webhooks.len(), 1);
assert_eq!(config.webhooks.webhooks[0].name, "Test Webhook");
}
#[test]
#[cfg(any(
feature = "streaming",
feature = "kafka",
feature = "google-pubsub",
feature = "kinesis"
))]
fn test_stream_config() {
let stream = StreamConfig {
name: "Test Stream".to_string(),
backend: StreamBackend::PubSub {
project_id: "test-project".to_string(),
topic_name: "test-topic".to_string(),
service_account_key: None,
config: HashMap::new(),
},
..Default::default()
};
let config = HammerworkConfig::new().add_stream(stream);
assert_eq!(config.streaming.streams.len(), 1);
assert_eq!(config.streaming.streams[0].name, "Test Stream");
}
#[test]
fn test_default_configs() {
let database_config = DatabaseConfig::default();
assert_eq!(database_config.url, "postgresql://localhost/hammerwork");
assert_eq!(database_config.pool_size, 10);
let worker_config = WorkerConfig::default();
assert_eq!(worker_config.pool_size, 4);
assert_eq!(
worker_config.polling_interval,
StdDuration::from_millis(500)
);
let archive_config = ArchiveConfig::default();
assert!(!archive_config.enabled);
assert_eq!(archive_config.compression_level, 6);
let logging_config = LoggingConfig::default();
assert_eq!(logging_config.level, "info");
assert!(!logging_config.json_format);
}
}