Skip to main content

rustvello_proto/
config.rs

1use cistell_core::Config;
2use serde::{Deserialize, Serialize};
3
4use crate::status::ConcurrencyControlType;
5
6/// Error type for parsing config enum variants from strings.
7#[derive(Debug)]
8pub struct ConfigParseError(pub String);
9
10impl std::fmt::Display for ConfigParseError {
11    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
12        write!(f, "{}", self.0)
13    }
14}
15
16impl std::error::Error for ConfigParseError {}
17
18/// Log output format.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[non_exhaustive]
21pub enum LogFormat {
22    /// Human-readable text with optional ANSI colors.
23    Text,
24    /// Structured JSON (one object per line).
25    Json,
26}
27
28impl Default for LogFormat {
29    fn default() -> Self {
30        Self::Text
31    }
32}
33
34impl std::str::FromStr for LogFormat {
35    type Err = ConfigParseError;
36    fn from_str(s: &str) -> Result<Self, Self::Err> {
37        match s.to_lowercase().as_str() {
38            "text" => Ok(Self::Text),
39            "json" => Ok(Self::Json),
40            _ => Err(ConfigParseError(format!("invalid LogFormat: {s}"))),
41        }
42    }
43}
44
45/// Controls how task arguments are displayed in logs and monitoring.
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47#[non_exhaustive]
48pub enum ArgumentPrintMode {
49    /// Show complete argument values.
50    Full,
51    /// Show only argument names (keys).
52    Keys,
53    /// Show truncated argument values (up to `truncate_arguments_length`).
54    Truncated,
55    /// Hide all argument values.
56    Hidden,
57}
58
59impl Default for ArgumentPrintMode {
60    fn default() -> Self {
61        Self::Truncated
62    }
63}
64
65impl std::str::FromStr for ArgumentPrintMode {
66    type Err = ConfigParseError;
67    fn from_str(s: &str) -> Result<Self, Self::Err> {
68        match s.to_lowercase().as_str() {
69            "full" => Ok(Self::Full),
70            "keys" => Ok(Self::Keys),
71            "truncated" => Ok(Self::Truncated),
72            "hidden" => Ok(Self::Hidden),
73            _ => Err(ConfigParseError(format!("invalid ArgumentPrintMode: {s}"))),
74        }
75    }
76}
77
78/// Per-task configuration options.
79///
80/// Mirrors pynenc's `ConfigTask` with settings for retries, concurrency,
81/// and execution behavior.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83#[non_exhaustive]
84pub struct TaskConfig {
85    /// Maximum number of retry attempts (0 = no retries)
86    pub max_retries: u32,
87    /// Error type names that should trigger a retry.
88    /// Empty means only explicit `RetryError` triggers retries.
89    pub retry_for_errors: Vec<String>,
90    /// Concurrency control strategy for running invocations
91    pub concurrency_control: ConcurrencyControlType,
92    /// Maximum number of concurrent invocations (when using Task concurrency)
93    pub running_concurrency: Option<u32>,
94    /// Concurrency control strategy applied at registration time
95    pub registration_concurrency: ConcurrencyControlType,
96    /// Whether to cache results for identical arguments
97    pub cache_results: bool,
98    /// Parameter names used as concurrency keys (when using Task concurrency)
99    pub key_arguments: Vec<String>,
100    /// Argument names to exclude from cache key computation
101    pub disable_cache_args: Vec<String>,
102    /// Raise an error when a call with matching key args but different
103    /// non-key args is registered (prevents silent overwrites)
104    pub on_diff_non_key_args_raise: bool,
105    /// Batch size for `parallelize()` — how many calls to submit at once
106    pub parallel_batch_size: usize,
107    /// Force a new workflow even if a matching invocation already exists
108    pub force_new_workflow: bool,
109    /// Reroute an invocation when it hits concurrency control limits
110    pub reroute_on_cc: bool,
111    /// Whether to run this task on a blocking thread (`tokio::task::spawn_blocking`).
112    /// Use for CPU-bound or synchronous I/O tasks that could starve the async executor.
113    pub blocking: bool,
114}
115
116impl Default for TaskConfig {
117    fn default() -> Self {
118        Self {
119            max_retries: 0,
120            retry_for_errors: Vec::new(),
121            concurrency_control: ConcurrencyControlType::Unlimited,
122            running_concurrency: None,
123            registration_concurrency: ConcurrencyControlType::Unlimited,
124            cache_results: false,
125            key_arguments: Vec::new(),
126            disable_cache_args: Vec::new(),
127            on_diff_non_key_args_raise: false,
128            parallel_batch_size: 100,
129            force_new_workflow: false,
130            reroute_on_cc: false,
131            blocking: false,
132        }
133    }
134}
135
136/// Global application configuration.
137#[derive(Config, Debug, Clone, Serialize, Deserialize)]
138#[config(prefix = "RUSTVELLO", group = "app")]
139#[non_exhaustive]
140pub struct AppConfig {
141    /// Unique identifier for this application instance
142    #[config(default = "rustvello")]
143    pub app_id: String,
144    /// Force synchronous task execution (for development/testing)
145    #[config(default = false)]
146    pub dev_mode_force_sync: bool,
147    /// Maximum time an invocation can be pending before recovery (seconds)
148    #[config(default = 300u64)]
149    pub max_pending_seconds: u64,
150    /// Heartbeat interval for runners (seconds)
151    #[config(default = 30u64)]
152    pub heartbeat_interval_seconds: u64,
153    /// Runners without a heartbeat for this long are considered dead (seconds)
154    #[config(default = 300u64)]
155    pub runner_dead_after_seconds: u64,
156    /// How often to scan for stale invocations (seconds)
157    #[config(default = 60u64)]
158    pub recovery_check_interval_seconds: u64,
159    /// Whether to display arguments in logs
160    #[config(default = true)]
161    pub print_arguments: bool,
162    /// How arguments are displayed in logs/monitoring
163    #[config(default = ArgumentPrintMode::Truncated)]
164    pub argument_print_mode: ArgumentPrintMode,
165    /// Maximum length for truncated argument display
166    #[config(default = 32usize)]
167    pub truncate_arguments_length: usize,
168    /// Cron expression for recovering pending invocations (empty = disabled)
169    #[config(default = "*/5 * * * *")]
170    pub recover_pending_cron: String,
171    /// Cron expression for recovering running invocations from dead runners (empty = disabled)
172    #[config(default = "*/15 * * * *")]
173    pub recover_running_cron: String,
174    /// Module paths to scan for trigger task registration
175    #[config(default = vec![])]
176    pub trigger_task_modules: Vec<String>,
177    /// TTL for cached invocation status checks (seconds, 0 = no cache)
178    #[config(default = 0.0f64)]
179    pub cached_status_time_seconds: f64,
180    /// Logging level (trace, debug, info, warn, error)
181    #[config(default = "info")]
182    pub logging_level: String,
183    /// Log output format (Text or Json)
184    #[config(default = LogFormat::Text)]
185    pub log_format: LogFormat,
186    /// Whether to use ANSI colors in text format (None = auto-detect TTY)
187    pub log_use_colors: Option<bool>,
188    /// Whether to show compact context (abbreviated names, truncated IDs)
189    #[config(default = true)]
190    pub compact_log_context: bool,
191    /// Blocking control — whether orchestrator uses blocking/waiting semantics
192    #[config(default = true)]
193    pub blocking_control: bool,
194    /// Hours after which final invocations can be auto-purged (0 = disabled)
195    #[config(default = 0.0f64)]
196    pub auto_final_invocation_purge_hours: f64,
197    /// Scheduler evaluation interval in seconds (trigger scheduler)
198    #[config(default = 60u64)]
199    pub scheduler_interval_seconds: u64,
200    /// Whether the scheduler is enabled
201    #[config(default = true)]
202    pub enable_scheduler: bool,
203    /// Total cycle interval for atomic global services (triggers, recovery, etc.)
204    /// in minutes. The interval is divided equally among all active runners.
205    #[config(default = 5.0f64)]
206    pub atomic_service_interval_minutes: f64,
207    /// Safety margin (minutes) subtracted from each runner's time slot to prevent
208    /// overlapping execution of atomic services across distributed runners.
209    #[config(default = 1.0f64)]
210    pub atomic_service_spread_margin_minutes: f64,
211    /// How frequently an individual runner checks if it should execute atomic
212    /// global services (minutes). Should be less than `atomic_service_interval_minutes`.
213    #[config(default = 0.5f64)]
214    pub atomic_service_check_interval_minutes: f64,
215}
216
217impl Default for AppConfig {
218    fn default() -> Self {
219        Self {
220            app_id: "rustvello".to_string(),
221            dev_mode_force_sync: false,
222            max_pending_seconds: 300,
223            heartbeat_interval_seconds: 30,
224            runner_dead_after_seconds: 300,
225            recovery_check_interval_seconds: 60,
226            print_arguments: true,
227            argument_print_mode: ArgumentPrintMode::Truncated,
228            truncate_arguments_length: 32,
229            recover_pending_cron: "*/5 * * * *".to_owned(),
230            recover_running_cron: "*/15 * * * *".to_owned(),
231            trigger_task_modules: Vec::new(),
232            cached_status_time_seconds: 0.0,
233            logging_level: "info".to_string(),
234            log_format: LogFormat::Text,
235            log_use_colors: None,
236            compact_log_context: true,
237            blocking_control: true,
238            auto_final_invocation_purge_hours: 0.0,
239            scheduler_interval_seconds: 60,
240            enable_scheduler: true,
241            atomic_service_interval_minutes: 5.0,
242            atomic_service_spread_margin_minutes: 1.0,
243            atomic_service_check_interval_minutes: 0.5,
244        }
245    }
246}
247
248impl AppConfig {
249    pub fn new(app_id: impl Into<String>) -> Self {
250        Self {
251            app_id: app_id.into(),
252            ..Default::default()
253        }
254    }
255}
256
257/// Configuration for the client data store subsystem.
258///
259/// Controls when serialized values are stored externally vs. returned inline,
260/// local LRU caching behavior, and size monitoring thresholds.
261///
262/// Mirrors pynenc's `ConfigClientDataStore`.
263#[derive(Debug, Clone, Serialize, Deserialize)]
264#[non_exhaustive]
265pub struct ClientDataStoreConfig {
266    /// Bypass entirely — always return inline serialized strings.
267    pub disabled: bool,
268    /// Minimum serialized string length (bytes) to store externally.
269    /// Values shorter than this are returned inline.
270    pub min_size_to_cache: usize,
271    /// Maximum serialized string length (bytes) to store externally.
272    /// Values longer than this are returned inline with a warning.
273    /// 0 = unlimited (no maximum).
274    pub max_size_to_cache: usize,
275    /// Maximum number of entries in the process-local LRU cache.
276    pub local_cache_size: usize,
277    /// Log a warning when any single value exceeds this size (bytes).
278    pub warn_threshold: usize,
279}
280
281impl Default for ClientDataStoreConfig {
282    fn default() -> Self {
283        Self {
284            disabled: false,
285            min_size_to_cache: 1024,
286            max_size_to_cache: 0,
287            local_cache_size: 1024,
288            warn_threshold: 10_485_760,
289        }
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    #[test]
298    fn task_config_defaults() {
299        let tc = TaskConfig::default();
300        assert_eq!(tc.max_retries, 0);
301        assert_eq!(tc.concurrency_control, ConcurrencyControlType::Unlimited);
302        assert!(tc.running_concurrency.is_none());
303        assert_eq!(
304            tc.registration_concurrency,
305            ConcurrencyControlType::Unlimited
306        );
307        assert!(!tc.cache_results);
308        assert!(tc.key_arguments.is_empty());
309        assert!(!tc.force_new_workflow);
310        assert!(!tc.reroute_on_cc);
311    }
312
313    #[test]
314    fn app_config_defaults() {
315        let ac = AppConfig::default();
316        assert_eq!(ac.app_id, "rustvello");
317        assert!(!ac.dev_mode_force_sync);
318        assert_eq!(ac.max_pending_seconds, 300);
319        assert_eq!(ac.heartbeat_interval_seconds, 30);
320    }
321
322    #[test]
323    fn app_config_new() {
324        let ac = AppConfig::new("my-app");
325        assert_eq!(ac.app_id, "my-app");
326        assert!(!ac.dev_mode_force_sync);
327    }
328
329    #[test]
330    fn serde_round_trip_task_config() {
331        let tc = TaskConfig {
332            max_retries: 3,
333            retry_for_errors: vec!["TimeoutError".to_string()],
334            concurrency_control: ConcurrencyControlType::Task,
335            running_concurrency: Some(5),
336            registration_concurrency: ConcurrencyControlType::Argument,
337            cache_results: true,
338            key_arguments: vec!["order_id".to_string()],
339            disable_cache_args: vec!["timestamp".to_string()],
340            on_diff_non_key_args_raise: true,
341            parallel_batch_size: 50,
342            force_new_workflow: true,
343            reroute_on_cc: true,
344            blocking: false,
345        };
346        let json = serde_json::to_string(&tc).unwrap();
347        let back: TaskConfig = serde_json::from_str(&json).unwrap();
348        assert_eq!(back.max_retries, 3);
349        assert_eq!(back.concurrency_control, ConcurrencyControlType::Task);
350        assert_eq!(back.running_concurrency, Some(5));
351        assert_eq!(
352            back.registration_concurrency,
353            ConcurrencyControlType::Argument
354        );
355        assert!(back.cache_results);
356        assert_eq!(back.key_arguments, vec!["order_id"]);
357        assert!(back.force_new_workflow);
358        assert!(back.reroute_on_cc);
359        assert!(!back.blocking);
360    }
361
362    #[test]
363    fn serde_round_trip_app_config() {
364        let ac = AppConfig::new("test");
365        let json = serde_json::to_string(&ac).unwrap();
366        let back: AppConfig = serde_json::from_str(&json).unwrap();
367        assert_eq!(back.app_id, "test");
368    }
369
370    #[test]
371    fn client_data_store_config_defaults() {
372        let c = ClientDataStoreConfig::default();
373        assert!(!c.disabled);
374        assert_eq!(c.min_size_to_cache, 1024);
375        assert_eq!(c.max_size_to_cache, 0);
376        assert_eq!(c.local_cache_size, 1024);
377        assert_eq!(c.warn_threshold, 10_485_760);
378    }
379
380    #[test]
381    fn serde_round_trip_client_data_store_config() {
382        let c = ClientDataStoreConfig {
383            disabled: true,
384            min_size_to_cache: 512,
385            max_size_to_cache: 1_000_000,
386            local_cache_size: 256,
387            warn_threshold: 5_000_000,
388        };
389        let json = serde_json::to_string(&c).unwrap();
390        let back: ClientDataStoreConfig = serde_json::from_str(&json).unwrap();
391        assert!(back.disabled);
392        assert_eq!(back.min_size_to_cache, 512);
393        assert_eq!(back.max_size_to_cache, 1_000_000);
394        assert_eq!(back.local_cache_size, 256);
395        assert_eq!(back.warn_threshold, 5_000_000);
396    }
397}