1use cistell_core::Config;
2use serde::{Deserialize, Serialize};
3
4use crate::status::ConcurrencyControlType;
5
6#[derive(Debug)]
8pub struct ConfigParseError(pub String);
9
10impl std::fmt::Display for ConfigParseError {
11 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
12 write!(f, "{}", self.0)
13 }
14}
15
16impl std::error::Error for ConfigParseError {}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[non_exhaustive]
21pub enum LogFormat {
22 Text,
24 Json,
26}
27
28impl Default for LogFormat {
29 fn default() -> Self {
30 Self::Text
31 }
32}
33
34impl std::str::FromStr for LogFormat {
35 type Err = ConfigParseError;
36 fn from_str(s: &str) -> Result<Self, Self::Err> {
37 match s.to_lowercase().as_str() {
38 "text" => Ok(Self::Text),
39 "json" => Ok(Self::Json),
40 _ => Err(ConfigParseError(format!("invalid LogFormat: {s}"))),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47#[non_exhaustive]
48pub enum ArgumentPrintMode {
49 Full,
51 Keys,
53 Truncated,
55 Hidden,
57}
58
59impl Default for ArgumentPrintMode {
60 fn default() -> Self {
61 Self::Truncated
62 }
63}
64
65impl std::str::FromStr for ArgumentPrintMode {
66 type Err = ConfigParseError;
67 fn from_str(s: &str) -> Result<Self, Self::Err> {
68 match s.to_lowercase().as_str() {
69 "full" => Ok(Self::Full),
70 "keys" => Ok(Self::Keys),
71 "truncated" => Ok(Self::Truncated),
72 "hidden" => Ok(Self::Hidden),
73 _ => Err(ConfigParseError(format!("invalid ArgumentPrintMode: {s}"))),
74 }
75 }
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
83#[non_exhaustive]
84pub struct TaskConfig {
85 pub max_retries: u32,
87 pub retry_for_errors: Vec<String>,
90 pub concurrency_control: ConcurrencyControlType,
92 pub running_concurrency: Option<u32>,
94 pub registration_concurrency: ConcurrencyControlType,
96 pub cache_results: bool,
98 pub key_arguments: Vec<String>,
100 pub disable_cache_args: Vec<String>,
102 pub on_diff_non_key_args_raise: bool,
105 pub parallel_batch_size: usize,
107 pub force_new_workflow: bool,
109 pub reroute_on_cc: bool,
111 pub blocking: bool,
114}
115
116impl Default for TaskConfig {
117 fn default() -> Self {
118 Self {
119 max_retries: 0,
120 retry_for_errors: Vec::new(),
121 concurrency_control: ConcurrencyControlType::Unlimited,
122 running_concurrency: None,
123 registration_concurrency: ConcurrencyControlType::Unlimited,
124 cache_results: false,
125 key_arguments: Vec::new(),
126 disable_cache_args: Vec::new(),
127 on_diff_non_key_args_raise: false,
128 parallel_batch_size: 100,
129 force_new_workflow: false,
130 reroute_on_cc: false,
131 blocking: false,
132 }
133 }
134}
135
136#[derive(Config, Debug, Clone, Serialize, Deserialize)]
138#[config(prefix = "RUSTVELLO", group = "app")]
139#[non_exhaustive]
140pub struct AppConfig {
141 #[config(default = "rustvello")]
143 pub app_id: String,
144 #[config(default = false)]
146 pub dev_mode_force_sync: bool,
147 #[config(default = 300u64)]
149 pub max_pending_seconds: u64,
150 #[config(default = 30u64)]
152 pub heartbeat_interval_seconds: u64,
153 #[config(default = 300u64)]
155 pub runner_dead_after_seconds: u64,
156 #[config(default = 60u64)]
158 pub recovery_check_interval_seconds: u64,
159 #[config(default = true)]
161 pub print_arguments: bool,
162 #[config(default = ArgumentPrintMode::Truncated)]
164 pub argument_print_mode: ArgumentPrintMode,
165 #[config(default = 32usize)]
167 pub truncate_arguments_length: usize,
168 #[config(default = "*/5 * * * *")]
170 pub recover_pending_cron: String,
171 #[config(default = "*/15 * * * *")]
173 pub recover_running_cron: String,
174 #[config(default = vec![])]
176 pub trigger_task_modules: Vec<String>,
177 #[config(default = 0.0f64)]
179 pub cached_status_time_seconds: f64,
180 #[config(default = "info")]
182 pub logging_level: String,
183 #[config(default = LogFormat::Text)]
185 pub log_format: LogFormat,
186 pub log_use_colors: Option<bool>,
188 #[config(default = true)]
190 pub compact_log_context: bool,
191 #[config(default = true)]
193 pub blocking_control: bool,
194 #[config(default = 0.0f64)]
196 pub auto_final_invocation_purge_hours: f64,
197 #[config(default = 60u64)]
199 pub scheduler_interval_seconds: u64,
200 #[config(default = true)]
202 pub enable_scheduler: bool,
203 #[config(default = 5.0f64)]
206 pub atomic_service_interval_minutes: f64,
207 #[config(default = 1.0f64)]
210 pub atomic_service_spread_margin_minutes: f64,
211 #[config(default = 0.5f64)]
214 pub atomic_service_check_interval_minutes: f64,
215}
216
217impl Default for AppConfig {
218 fn default() -> Self {
219 Self {
220 app_id: "rustvello".to_string(),
221 dev_mode_force_sync: false,
222 max_pending_seconds: 300,
223 heartbeat_interval_seconds: 30,
224 runner_dead_after_seconds: 300,
225 recovery_check_interval_seconds: 60,
226 print_arguments: true,
227 argument_print_mode: ArgumentPrintMode::Truncated,
228 truncate_arguments_length: 32,
229 recover_pending_cron: "*/5 * * * *".to_owned(),
230 recover_running_cron: "*/15 * * * *".to_owned(),
231 trigger_task_modules: Vec::new(),
232 cached_status_time_seconds: 0.0,
233 logging_level: "info".to_string(),
234 log_format: LogFormat::Text,
235 log_use_colors: None,
236 compact_log_context: true,
237 blocking_control: true,
238 auto_final_invocation_purge_hours: 0.0,
239 scheduler_interval_seconds: 60,
240 enable_scheduler: true,
241 atomic_service_interval_minutes: 5.0,
242 atomic_service_spread_margin_minutes: 1.0,
243 atomic_service_check_interval_minutes: 0.5,
244 }
245 }
246}
247
248impl AppConfig {
249 pub fn new(app_id: impl Into<String>) -> Self {
250 Self {
251 app_id: app_id.into(),
252 ..Default::default()
253 }
254 }
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
264#[non_exhaustive]
265pub struct ClientDataStoreConfig {
266 pub disabled: bool,
268 pub min_size_to_cache: usize,
271 pub max_size_to_cache: usize,
275 pub local_cache_size: usize,
277 pub warn_threshold: usize,
279}
280
281impl Default for ClientDataStoreConfig {
282 fn default() -> Self {
283 Self {
284 disabled: false,
285 min_size_to_cache: 1024,
286 max_size_to_cache: 0,
287 local_cache_size: 1024,
288 warn_threshold: 10_485_760,
289 }
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296
297 #[test]
298 fn task_config_defaults() {
299 let tc = TaskConfig::default();
300 assert_eq!(tc.max_retries, 0);
301 assert_eq!(tc.concurrency_control, ConcurrencyControlType::Unlimited);
302 assert!(tc.running_concurrency.is_none());
303 assert_eq!(
304 tc.registration_concurrency,
305 ConcurrencyControlType::Unlimited
306 );
307 assert!(!tc.cache_results);
308 assert!(tc.key_arguments.is_empty());
309 assert!(!tc.force_new_workflow);
310 assert!(!tc.reroute_on_cc);
311 }
312
313 #[test]
314 fn app_config_defaults() {
315 let ac = AppConfig::default();
316 assert_eq!(ac.app_id, "rustvello");
317 assert!(!ac.dev_mode_force_sync);
318 assert_eq!(ac.max_pending_seconds, 300);
319 assert_eq!(ac.heartbeat_interval_seconds, 30);
320 }
321
322 #[test]
323 fn app_config_new() {
324 let ac = AppConfig::new("my-app");
325 assert_eq!(ac.app_id, "my-app");
326 assert!(!ac.dev_mode_force_sync);
327 }
328
329 #[test]
330 fn serde_round_trip_task_config() {
331 let tc = TaskConfig {
332 max_retries: 3,
333 retry_for_errors: vec!["TimeoutError".to_string()],
334 concurrency_control: ConcurrencyControlType::Task,
335 running_concurrency: Some(5),
336 registration_concurrency: ConcurrencyControlType::Argument,
337 cache_results: true,
338 key_arguments: vec!["order_id".to_string()],
339 disable_cache_args: vec!["timestamp".to_string()],
340 on_diff_non_key_args_raise: true,
341 parallel_batch_size: 50,
342 force_new_workflow: true,
343 reroute_on_cc: true,
344 blocking: false,
345 };
346 let json = serde_json::to_string(&tc).unwrap();
347 let back: TaskConfig = serde_json::from_str(&json).unwrap();
348 assert_eq!(back.max_retries, 3);
349 assert_eq!(back.concurrency_control, ConcurrencyControlType::Task);
350 assert_eq!(back.running_concurrency, Some(5));
351 assert_eq!(
352 back.registration_concurrency,
353 ConcurrencyControlType::Argument
354 );
355 assert!(back.cache_results);
356 assert_eq!(back.key_arguments, vec!["order_id"]);
357 assert!(back.force_new_workflow);
358 assert!(back.reroute_on_cc);
359 assert!(!back.blocking);
360 }
361
362 #[test]
363 fn serde_round_trip_app_config() {
364 let ac = AppConfig::new("test");
365 let json = serde_json::to_string(&ac).unwrap();
366 let back: AppConfig = serde_json::from_str(&json).unwrap();
367 assert_eq!(back.app_id, "test");
368 }
369
370 #[test]
371 fn client_data_store_config_defaults() {
372 let c = ClientDataStoreConfig::default();
373 assert!(!c.disabled);
374 assert_eq!(c.min_size_to_cache, 1024);
375 assert_eq!(c.max_size_to_cache, 0);
376 assert_eq!(c.local_cache_size, 1024);
377 assert_eq!(c.warn_threshold, 10_485_760);
378 }
379
380 #[test]
381 fn serde_round_trip_client_data_store_config() {
382 let c = ClientDataStoreConfig {
383 disabled: true,
384 min_size_to_cache: 512,
385 max_size_to_cache: 1_000_000,
386 local_cache_size: 256,
387 warn_threshold: 5_000_000,
388 };
389 let json = serde_json::to_string(&c).unwrap();
390 let back: ClientDataStoreConfig = serde_json::from_str(&json).unwrap();
391 assert!(back.disabled);
392 assert_eq!(back.min_size_to_cache, 512);
393 assert_eq!(back.max_size_to_cache, 1_000_000);
394 assert_eq!(back.local_cache_size, 256);
395 assert_eq!(back.warn_threshold, 5_000_000);
396 }
397}