#[cfg(feature = "worker-pg")]
use crate::config::service::worker::pg::PgWorkerServiceConfig;
#[cfg(feature = "worker-sidekiq")]
use crate::config::service::worker::sidekiq::SidekiqWorkerServiceConfig;
use config::{FileFormat, FileSourceString};
use serde_derive::{Deserialize, Serialize};
use serde_with::{serde_as, skip_serializing_none};
use std::collections::{BTreeMap, BTreeSet};
use strum_macros::{EnumString, IntoStaticStr};
use validator::Validate;
#[cfg(feature = "worker-pg")]
pub mod pg;
#[cfg(feature = "worker-sidekiq")]
pub mod sidekiq;
pub(crate) fn default_config() -> config::File<FileSourceString, FileFormat> {
config::File::from_str(include_str!("default.toml"), FileFormat::Toml)
}
#[serde_with::skip_serializing_none]
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[non_exhaustive]
pub struct WorkerServiceConfig {
#[validate(nested)]
#[serde(default)]
pub enqueue_config: crate::worker::config::EnqueueConfig,
#[validate(nested)]
#[serde(default)]
pub worker_config: crate::worker::config::WorkerConfig,
#[cfg(feature = "worker-sidekiq")]
#[validate(nested)]
pub sidekiq: crate::config::service::ServiceConfig<WorkerConfig<SidekiqWorkerServiceConfig>>,
#[cfg(feature = "worker-pg")]
#[validate(nested)]
pub pg: crate::config::service::ServiceConfig<WorkerConfig<PgWorkerServiceConfig>>,
}
#[serde_with::skip_serializing_none]
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[non_exhaustive]
pub struct WorkerConfig<T: Validate> {
#[serde(flatten, default)]
#[validate(nested)]
pub common: CommonConfig,
#[serde(flatten)]
#[validate(nested)]
pub custom: T,
}
#[serde_as]
#[skip_serializing_none]
#[derive(Debug, Default, Validate, Clone, Serialize, Deserialize)]
#[serde(default, rename_all = "kebab-case")]
#[non_exhaustive]
pub struct CommonConfig {
#[serde(default = "CommonConfig::default_num_workers")]
pub num_workers: u32,
#[serde(default)]
pub balance_strategy: BalanceStrategy,
#[serde(default)]
pub queues: Option<BTreeSet<String>>,
#[serde(default)]
#[validate(nested)]
pub queue_config: BTreeMap<String, QueueConfig>,
}
impl CommonConfig {
fn default_num_workers() -> u32 {
num_cpus::get() as u32
}
}
#[derive(
Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, EnumString, IntoStaticStr,
)]
#[serde(rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case")]
#[non_exhaustive]
pub enum BalanceStrategy {
#[default]
RoundRobin,
None,
}
#[serde_with::skip_serializing_none]
#[derive(Debug, Default, Validate, Clone, Serialize, Deserialize)]
#[serde(default, rename_all = "kebab-case")]
#[non_exhaustive]
pub struct QueueConfig {
pub num_workers: Option<u32>,
}
#[derive(
Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize, EnumString, IntoStaticStr,
)]
#[serde(rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case")]
#[non_exhaustive]
pub enum StaleCleanUpBehavior {
Manual,
AutoCleanAll,
#[default]
AutoCleanStale,
}
#[cfg(test)]
mod tests {
#[test]
#[cfg_attr(coverage_nightly, coverage(off))]
fn default_num_workers() {
assert_eq!(
super::CommonConfig::default_num_workers(),
num_cpus::get() as u32
);
}
}
#[cfg(all(
test,
feature = "worker-sidekiq",
feature = "worker-pg",
feature = "worker-pg-install",
feature = "db-diesel-pool-async"
))]
mod deserialize_tests {
use super::*;
use crate::testing::snapshot::TestCase;
use insta::assert_toml_snapshot;
use rstest::{fixture, rstest};
#[fixture]
#[cfg_attr(coverage_nightly, coverage(off))]
fn case() -> TestCase {
Default::default()
}
#[rstest]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[pg]
num-workers = 8
"#
)]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[pg]
num-workers = 8
[enqueue-config]
queue = "default"
"#
)]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[pg]
num-workers = 8
[worker-config]
timeout = true
max-duration = 120000
[worker-config.retry-config]
max-retries = 10
delay = 10000
delay-offset = 20000
max-delay = 30000
backoff-strategy = "exponential"
"#
)]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[pg]
num-workers = 8
[worker-config.pg]
success-action = "archive"
failure-action = "delete"
"#
)]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[pg]
num-workers = 8
[worker-config.sidekiq]
disable-argument-coercion = true
"#
)]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[sidekiq.redis.enqueue-pool]
min-idle = 1
max-connections = 2
[sidekiq.redis.fetch-pool]
min-idle = 3
max-connections = 4
[sidekiq.periodic]
stale-cleanup = "auto-clean-all"
[pg]
num-workers = 8
"#
)]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[pg]
num-workers = 8
[pg.database]
uri = "postgres://postgres:postgres@localhost:5432/example_dev"
connect-timeout = 1000
connect-lazy = false
acquire-timeout = 2000
idle-timeout = 10000
max-lifetime = 60000
min-connections = 1
max-connections = 2
test-on-checkout = false
retry-connection = false
"#
)]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[pg]
num-workers = 8
[pg.queue-fetch-config]
error-delay = 15000
empty-delay = 20000
[pg.periodic]
enable = false
stale-cleanup = "auto-clean-all"
"#
)]
#[case(
r#"
[sidekiq]
num-workers = 8
[sidekiq.redis]
uri = "redis://localhost:6379"
[pg]
num-workers = 8
[pg.install]
enable = true
"#
)]
#[cfg_attr(coverage_nightly, coverage(off))]
fn deserialize_tests(_case: TestCase, #[case] config: &str) {
let worker_service_config: WorkerServiceConfig = toml::from_str(config).unwrap();
assert_toml_snapshot!(worker_service_config);
}
}