use cistell_core::Config;
use serde::{Deserialize, Serialize};
use crate::status::ConcurrencyControlType;
#[derive(Debug)]
pub struct ConfigParseError(pub String);
impl std::fmt::Display for ConfigParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for ConfigParseError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum LogFormat {
Text,
Json,
}
impl Default for LogFormat {
fn default() -> Self {
Self::Text
}
}
impl std::str::FromStr for LogFormat {
type Err = ConfigParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"text" => Ok(Self::Text),
"json" => Ok(Self::Json),
_ => Err(ConfigParseError(format!("invalid LogFormat: {s}"))),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum ArgumentPrintMode {
Full,
Keys,
Truncated,
Hidden,
}
impl Default for ArgumentPrintMode {
fn default() -> Self {
Self::Truncated
}
}
impl std::str::FromStr for ArgumentPrintMode {
type Err = ConfigParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"full" => Ok(Self::Full),
"keys" => Ok(Self::Keys),
"truncated" => Ok(Self::Truncated),
"hidden" => Ok(Self::Hidden),
_ => Err(ConfigParseError(format!("invalid ArgumentPrintMode: {s}"))),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct TaskConfig {
pub max_retries: u32,
pub retry_for_errors: Vec<String>,
pub concurrency_control: ConcurrencyControlType,
pub running_concurrency: Option<u32>,
pub registration_concurrency: ConcurrencyControlType,
pub cache_results: bool,
pub key_arguments: Vec<String>,
pub disable_cache_args: Vec<String>,
pub on_diff_non_key_args_raise: bool,
pub parallel_batch_size: usize,
pub force_new_workflow: bool,
pub reroute_on_cc: bool,
pub blocking: bool,
}
impl Default for TaskConfig {
fn default() -> Self {
Self {
max_retries: 0,
retry_for_errors: Vec::new(),
concurrency_control: ConcurrencyControlType::Unlimited,
running_concurrency: None,
registration_concurrency: ConcurrencyControlType::Unlimited,
cache_results: false,
key_arguments: Vec::new(),
disable_cache_args: Vec::new(),
on_diff_non_key_args_raise: false,
parallel_batch_size: 100,
force_new_workflow: false,
reroute_on_cc: false,
blocking: false,
}
}
}
#[derive(Config, Debug, Clone, Serialize, Deserialize)]
#[config(prefix = "RUSTVELLO", group = "app")]
#[non_exhaustive]
pub struct AppConfig {
#[config(default = "rustvello")]
pub app_id: String,
#[config(default = false)]
pub dev_mode_force_sync: bool,
#[config(default = 300u64)]
pub max_pending_seconds: u64,
#[config(default = 30u64)]
pub heartbeat_interval_seconds: u64,
#[config(default = 300u64)]
pub runner_dead_after_seconds: u64,
#[config(default = 60u64)]
pub recovery_check_interval_seconds: u64,
#[config(default = true)]
pub print_arguments: bool,
#[config(default = ArgumentPrintMode::Truncated)]
pub argument_print_mode: ArgumentPrintMode,
#[config(default = 32usize)]
pub truncate_arguments_length: usize,
#[config(default = "*/5 * * * *")]
pub recover_pending_cron: String,
#[config(default = "*/15 * * * *")]
pub recover_running_cron: String,
#[config(default = vec![])]
pub trigger_task_modules: Vec<String>,
#[config(default = 0.0f64)]
pub cached_status_time_seconds: f64,
#[config(default = "info")]
pub logging_level: String,
#[config(default = LogFormat::Text)]
pub log_format: LogFormat,
pub log_use_colors: Option<bool>,
#[config(default = true)]
pub compact_log_context: bool,
#[config(default = true)]
pub blocking_control: bool,
#[config(default = 0.0f64)]
pub auto_final_invocation_purge_hours: f64,
#[config(default = 60u64)]
pub scheduler_interval_seconds: u64,
#[config(default = true)]
pub enable_scheduler: bool,
#[config(default = 5.0f64)]
pub atomic_service_interval_minutes: f64,
#[config(default = 1.0f64)]
pub atomic_service_spread_margin_minutes: f64,
#[config(default = 0.5f64)]
pub atomic_service_check_interval_minutes: f64,
}
impl Default for AppConfig {
fn default() -> Self {
Self {
app_id: "rustvello".to_string(),
dev_mode_force_sync: false,
max_pending_seconds: 300,
heartbeat_interval_seconds: 30,
runner_dead_after_seconds: 300,
recovery_check_interval_seconds: 60,
print_arguments: true,
argument_print_mode: ArgumentPrintMode::Truncated,
truncate_arguments_length: 32,
recover_pending_cron: "*/5 * * * *".to_owned(),
recover_running_cron: "*/15 * * * *".to_owned(),
trigger_task_modules: Vec::new(),
cached_status_time_seconds: 0.0,
logging_level: "info".to_string(),
log_format: LogFormat::Text,
log_use_colors: None,
compact_log_context: true,
blocking_control: true,
auto_final_invocation_purge_hours: 0.0,
scheduler_interval_seconds: 60,
enable_scheduler: true,
atomic_service_interval_minutes: 5.0,
atomic_service_spread_margin_minutes: 1.0,
atomic_service_check_interval_minutes: 0.5,
}
}
}
impl AppConfig {
pub fn new(app_id: impl Into<String>) -> Self {
Self {
app_id: app_id.into(),
..Default::default()
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct ClientDataStoreConfig {
pub disabled: bool,
pub min_size_to_cache: usize,
pub max_size_to_cache: usize,
pub local_cache_size: usize,
pub warn_threshold: usize,
}
impl Default for ClientDataStoreConfig {
fn default() -> Self {
Self {
disabled: false,
min_size_to_cache: 1024,
max_size_to_cache: 0,
local_cache_size: 1024,
warn_threshold: 10_485_760,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn task_config_defaults() {
let tc = TaskConfig::default();
assert_eq!(tc.max_retries, 0);
assert_eq!(tc.concurrency_control, ConcurrencyControlType::Unlimited);
assert!(tc.running_concurrency.is_none());
assert_eq!(
tc.registration_concurrency,
ConcurrencyControlType::Unlimited
);
assert!(!tc.cache_results);
assert!(tc.key_arguments.is_empty());
assert!(!tc.force_new_workflow);
assert!(!tc.reroute_on_cc);
}
#[test]
fn app_config_defaults() {
let ac = AppConfig::default();
assert_eq!(ac.app_id, "rustvello");
assert!(!ac.dev_mode_force_sync);
assert_eq!(ac.max_pending_seconds, 300);
assert_eq!(ac.heartbeat_interval_seconds, 30);
}
#[test]
fn app_config_new() {
let ac = AppConfig::new("my-app");
assert_eq!(ac.app_id, "my-app");
assert!(!ac.dev_mode_force_sync);
}
#[test]
fn serde_round_trip_task_config() {
let tc = TaskConfig {
max_retries: 3,
retry_for_errors: vec!["TimeoutError".to_string()],
concurrency_control: ConcurrencyControlType::Task,
running_concurrency: Some(5),
registration_concurrency: ConcurrencyControlType::Argument,
cache_results: true,
key_arguments: vec!["order_id".to_string()],
disable_cache_args: vec!["timestamp".to_string()],
on_diff_non_key_args_raise: true,
parallel_batch_size: 50,
force_new_workflow: true,
reroute_on_cc: true,
blocking: false,
};
let json = serde_json::to_string(&tc).unwrap();
let back: TaskConfig = serde_json::from_str(&json).unwrap();
assert_eq!(back.max_retries, 3);
assert_eq!(back.concurrency_control, ConcurrencyControlType::Task);
assert_eq!(back.running_concurrency, Some(5));
assert_eq!(
back.registration_concurrency,
ConcurrencyControlType::Argument
);
assert!(back.cache_results);
assert_eq!(back.key_arguments, vec!["order_id"]);
assert!(back.force_new_workflow);
assert!(back.reroute_on_cc);
assert!(!back.blocking);
}
#[test]
fn serde_round_trip_app_config() {
let ac = AppConfig::new("test");
let json = serde_json::to_string(&ac).unwrap();
let back: AppConfig = serde_json::from_str(&json).unwrap();
assert_eq!(back.app_id, "test");
}
#[test]
fn client_data_store_config_defaults() {
let c = ClientDataStoreConfig::default();
assert!(!c.disabled);
assert_eq!(c.min_size_to_cache, 1024);
assert_eq!(c.max_size_to_cache, 0);
assert_eq!(c.local_cache_size, 1024);
assert_eq!(c.warn_threshold, 10_485_760);
}
#[test]
fn serde_round_trip_client_data_store_config() {
let c = ClientDataStoreConfig {
disabled: true,
min_size_to_cache: 512,
max_size_to_cache: 1_000_000,
local_cache_size: 256,
warn_threshold: 5_000_000,
};
let json = serde_json::to_string(&c).unwrap();
let back: ClientDataStoreConfig = serde_json::from_str(&json).unwrap();
assert!(back.disabled);
assert_eq!(back.min_size_to_cache, 512);
assert_eq!(back.max_size_to_cache, 1_000_000);
assert_eq!(back.local_cache_size, 256);
assert_eq!(back.warn_threshold, 5_000_000);
}
}