Skip to main content

celers_core/
config.rs

1//! Celery-compatible configuration for `CeleRS`
2//!
3//! This module provides configuration structures that are compatible with
4//! Python Celery's configuration format, making it easy to migrate from
5//! Celery to `CeleRS` or run them side-by-side.
6//!
7//! # Example
8//!
9//! ```rust
10//! use celers_core::config::{CeleryConfig, TaskConfig, BrokerTransport};
11//! use std::time::Duration;
12//!
13//! let config = CeleryConfig::default()
14//!     .with_broker_url("redis://localhost:6379/0")
15//!     .with_result_backend("redis://localhost:6379/1")
16//!     .with_task_serializer("json")
17//!     .with_timezone("UTC")
18//!     .with_worker_concurrency(4);
19//!
20//! assert_eq!(config.broker_url, "redis://localhost:6379/0");
21//! assert_eq!(config.worker_concurrency, 4);
22//! ```
23
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::time::Duration;
27
28/// Celery-compatible main configuration
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct CeleryConfig {
31    /// Broker connection URL (`CELERY_BROKER_URL`)
32    pub broker_url: String,
33
34    /// Result backend URL (`CELERY_RESULT_BACKEND`)
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub result_backend: Option<String>,
37
38    /// Task serializer format (`CELERY_TASK_SERIALIZER`)
39    #[serde(default = "default_serializer")]
40    pub task_serializer: String,
41
42    /// Result serializer format (`CELERY_RESULT_SERIALIZER`)
43    #[serde(default = "default_serializer")]
44    pub result_serializer: String,
45
46    /// Accepted content types (`CELERY_ACCEPT_CONTENT`)
47    #[serde(default = "default_accept_content")]
48    pub accept_content: Vec<String>,
49
50    /// Timezone for scheduling (`CELERY_TIMEZONE`)
51    #[serde(default = "default_timezone")]
52    pub timezone: String,
53
54    /// Use UTC timestamps (`CELERY_ENABLE_UTC`)
55    #[serde(default = "default_true")]
56    pub enable_utc: bool,
57
58    /// Track task started events (`CELERY_TASK_TRACK_STARTED`)
59    #[serde(default)]
60    pub task_track_started: bool,
61
62    /// Send task sent events (`CELERY_TASK_SEND_SENT_EVENT`)
63    #[serde(default)]
64    pub task_send_sent_event: bool,
65
66    /// Acknowledge tasks late (`CELERY_TASK_ACKS_LATE`)
67    #[serde(default)]
68    pub task_acks_late: bool,
69
70    /// Reject on worker lost (`CELERY_TASK_REJECT_ON_WORKER_LOST`)
71    #[serde(default)]
72    pub task_reject_on_worker_lost: bool,
73
74    /// Worker concurrency (`CELERYD_CONCURRENCY`)
75    #[serde(default = "default_concurrency")]
76    pub worker_concurrency: usize,
77
78    /// Worker prefetch multiplier (`CELERYD_PREFETCH_MULTIPLIER`)
79    #[serde(default = "default_prefetch_multiplier")]
80    pub worker_prefetch_multiplier: usize,
81
82    /// Maximum tasks per child before restart (`CELERYD_MAX_TASKS_PER_CHILD`)
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub worker_max_tasks_per_child: Option<usize>,
85
86    /// Maximum memory per child in KB (`CELERYD_MAX_MEMORY_PER_CHILD`)
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub worker_max_memory_per_child: Option<usize>,
89
90    /// Worker heartbeat interval in seconds (`CELERY_WORKER_HEARTBEAT`)
91    #[serde(default = "default_heartbeat_interval")]
92    pub worker_heartbeat: u64,
93
94    /// Task default queue (`CELERY_DEFAULT_QUEUE`)
95    #[serde(default = "default_queue_name")]
96    pub task_default_queue: String,
97
98    /// Task default exchange (`CELERY_DEFAULT_EXCHANGE`)
99    #[serde(default = "default_queue_name")]
100    pub task_default_exchange: String,
101
102    /// Task default exchange type (`CELERY_DEFAULT_EXCHANGE_TYPE`)
103    #[serde(default = "default_exchange_type")]
104    pub task_default_exchange_type: String,
105
106    /// Task default routing key (`CELERY_DEFAULT_ROUTING_KEY`)
107    #[serde(default = "default_queue_name")]
108    pub task_default_routing_key: String,
109
110    /// Task routes (`CELERY_TASK_ROUTES`)
111    #[serde(default)]
112    pub task_routes: HashMap<String, TaskRoute>,
113
114    /// Task time limit in seconds (`CELERY_TASK_TIME_LIMIT`)
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub task_time_limit: Option<u64>,
117
118    /// Task soft time limit in seconds (`CELERY_TASK_SOFT_TIME_LIMIT`)
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub task_soft_time_limit: Option<u64>,
121
122    /// Task default retry delay in seconds (`CELERY_TASK_DEFAULT_RETRY_DELAY`)
123    #[serde(default = "default_retry_delay")]
124    pub task_default_retry_delay: u64,
125
126    /// Task max retries (`CELERY_TASK_MAX_RETRIES`)
127    #[serde(default = "default_max_retries")]
128    pub task_max_retries: u32,
129
130    /// Result expires in seconds (`CELERY_RESULT_EXPIRES`)
131    #[serde(default = "default_result_expires")]
132    pub result_expires: u64,
133
134    /// Result compression (`CELERY_RESULT_COMPRESSION`)
135    #[serde(skip_serializing_if = "Option::is_none")]
136    pub result_compression: Option<String>,
137
138    /// Result compression threshold in bytes
139    #[serde(default = "default_compression_threshold")]
140    pub result_compression_threshold: usize,
141
142    /// Task-specific configurations
143    #[serde(default)]
144    pub task_annotations: HashMap<String, TaskConfig>,
145
146    /// Broker transport options (`CELERY_BROKER_TRANSPORT_OPTIONS`)
147    #[serde(default)]
148    pub broker_transport_options: BrokerTransport,
149
150    /// Result backend transport options
151    #[serde(default)]
152    pub result_backend_transport_options: BackendTransport,
153
154    /// Beat schedule configuration (`CELERYBEAT_SCHEDULE`)
155    #[serde(default)]
156    pub beat_schedule: HashMap<String, BeatSchedule>,
157
158    /// Custom configuration extensions
159    #[serde(flatten)]
160    pub custom: HashMap<String, serde_json::Value>,
161}
162
163/// Task routing configuration
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct TaskRoute {
166    /// Target queue name
167    pub queue: String,
168
169    /// Exchange name
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub exchange: Option<String>,
172
173    /// Routing key
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub routing_key: Option<String>,
176
177    /// Priority (0-255)
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub priority: Option<u8>,
180}
181
182/// Per-task configuration
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct TaskConfig {
185    /// Task time limit in seconds
186    #[serde(skip_serializing_if = "Option::is_none")]
187    pub time_limit: Option<u64>,
188
189    /// Task soft time limit in seconds
190    #[serde(skip_serializing_if = "Option::is_none")]
191    pub soft_time_limit: Option<u64>,
192
193    /// Max retries for this task
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub max_retries: Option<u32>,
196
197    /// Default retry delay
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub default_retry_delay: Option<u64>,
200
201    /// Task priority
202    #[serde(skip_serializing_if = "Option::is_none")]
203    pub priority: Option<u8>,
204
205    /// Target queue
206    #[serde(skip_serializing_if = "Option::is_none")]
207    pub queue: Option<String>,
208
209    /// Acknowledge late
210    #[serde(skip_serializing_if = "Option::is_none")]
211    pub acks_late: Option<bool>,
212
213    /// Track started
214    #[serde(skip_serializing_if = "Option::is_none")]
215    pub track_started: Option<bool>,
216
217    /// Rate limit (e.g., "10/s", "100/m", "1000/h")
218    #[serde(skip_serializing_if = "Option::is_none")]
219    pub rate_limit: Option<String>,
220}
221
222/// Broker transport options
223#[derive(Debug, Clone, Default, Serialize, Deserialize)]
224pub struct BrokerTransport {
225    /// Visibility timeout in seconds
226    #[serde(skip_serializing_if = "Option::is_none")]
227    pub visibility_timeout: Option<u64>,
228
229    /// Connection pool size
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub max_connections: Option<usize>,
232
233    /// Connection retry settings
234    #[serde(skip_serializing_if = "Option::is_none")]
235    pub max_retries: Option<u32>,
236
237    /// Retry interval in seconds
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub interval_start: Option<u64>,
240
241    /// Retry interval max in seconds
242    #[serde(skip_serializing_if = "Option::is_none")]
243    pub interval_max: Option<u64>,
244
245    /// Additional transport-specific options
246    #[serde(flatten)]
247    pub custom: HashMap<String, serde_json::Value>,
248}
249
250/// Result backend transport options
251#[derive(Debug, Clone, Default, Serialize, Deserialize)]
252pub struct BackendTransport {
253    /// Result expiration in seconds
254    #[serde(skip_serializing_if = "Option::is_none")]
255    pub result_expires: Option<u64>,
256
257    /// Connection pool size
258    #[serde(skip_serializing_if = "Option::is_none")]
259    pub max_connections: Option<usize>,
260
261    /// Additional backend-specific options
262    #[serde(flatten)]
263    pub custom: HashMap<String, serde_json::Value>,
264}
265
266/// Beat scheduler configuration
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct BeatSchedule {
269    /// Task name to execute
270    pub task: String,
271
272    /// Schedule definition
273    pub schedule: ScheduleDefinition,
274
275    /// Task arguments
276    #[serde(skip_serializing_if = "Option::is_none")]
277    pub args: Option<Vec<serde_json::Value>>,
278
279    /// Task keyword arguments
280    #[serde(skip_serializing_if = "Option::is_none")]
281    pub kwargs: Option<HashMap<String, serde_json::Value>>,
282
283    /// Task options
284    #[serde(skip_serializing_if = "Option::is_none")]
285    pub options: Option<TaskConfig>,
286}
287
288/// Schedule definition for beat tasks
289#[derive(Debug, Clone, Serialize, Deserialize)]
290#[serde(untagged)]
291pub enum ScheduleDefinition {
292    /// Crontab schedule (e.g., "0 0 * * *")
293    Crontab(String),
294
295    /// Interval in seconds
296    Interval(u64),
297
298    /// Complex schedule
299    Complex {
300        /// Schedule type (crontab, interval, solar)
301        #[serde(rename = "type")]
302        schedule_type: String,
303
304        /// Schedule value
305        value: serde_json::Value,
306    },
307}
308
309impl Default for CeleryConfig {
310    fn default() -> Self {
311        Self {
312            broker_url: "redis://localhost:6379/0".to_string(),
313            result_backend: Some("redis://localhost:6379/1".to_string()),
314            task_serializer: default_serializer(),
315            result_serializer: default_serializer(),
316            accept_content: default_accept_content(),
317            timezone: default_timezone(),
318            enable_utc: true,
319            task_track_started: false,
320            task_send_sent_event: false,
321            task_acks_late: false,
322            task_reject_on_worker_lost: false,
323            worker_concurrency: default_concurrency(),
324            worker_prefetch_multiplier: default_prefetch_multiplier(),
325            worker_max_tasks_per_child: None,
326            worker_max_memory_per_child: None,
327            worker_heartbeat: default_heartbeat_interval(),
328            task_default_queue: default_queue_name(),
329            task_default_exchange: default_queue_name(),
330            task_default_exchange_type: default_exchange_type(),
331            task_default_routing_key: default_queue_name(),
332            task_routes: HashMap::new(),
333            task_time_limit: None,
334            task_soft_time_limit: None,
335            task_default_retry_delay: default_retry_delay(),
336            task_max_retries: default_max_retries(),
337            result_expires: default_result_expires(),
338            result_compression: None,
339            result_compression_threshold: default_compression_threshold(),
340            task_annotations: HashMap::new(),
341            broker_transport_options: BrokerTransport::default(),
342            result_backend_transport_options: BackendTransport::default(),
343            beat_schedule: HashMap::new(),
344            custom: HashMap::new(),
345        }
346    }
347}
348
349impl CeleryConfig {
350    /// Create a new configuration with broker URL
351    #[inline]
352    pub fn new(broker_url: impl Into<String>) -> Self {
353        Self {
354            broker_url: broker_url.into(),
355            ..Default::default()
356        }
357    }
358
359    /// Set broker URL
360    #[inline]
361    #[must_use]
362    pub fn with_broker_url(mut self, url: impl Into<String>) -> Self {
363        self.broker_url = url.into();
364        self
365    }
366
367    /// Set result backend URL
368    #[inline]
369    #[must_use]
370    pub fn with_result_backend(mut self, url: impl Into<String>) -> Self {
371        self.result_backend = Some(url.into());
372        self
373    }
374
375    /// Set task serializer
376    #[inline]
377    #[must_use]
378    pub fn with_task_serializer(mut self, serializer: impl Into<String>) -> Self {
379        self.task_serializer = serializer.into();
380        self
381    }
382
383    /// Set result serializer
384    #[inline]
385    #[must_use]
386    pub fn with_result_serializer(mut self, serializer: impl Into<String>) -> Self {
387        self.result_serializer = serializer.into();
388        self
389    }
390
391    /// Set accepted content types
392    #[inline]
393    #[must_use]
394    pub fn with_accept_content(mut self, content: Vec<String>) -> Self {
395        self.accept_content = content;
396        self
397    }
398
399    /// Set timezone
400    #[inline]
401    #[must_use]
402    pub fn with_timezone(mut self, tz: impl Into<String>) -> Self {
403        self.timezone = tz.into();
404        self
405    }
406
407    /// Enable/disable UTC
408    #[must_use]
409    pub const fn with_enable_utc(mut self, enabled: bool) -> Self {
410        self.enable_utc = enabled;
411        self
412    }
413
414    /// Set worker concurrency
415    #[must_use]
416    pub const fn with_worker_concurrency(mut self, concurrency: usize) -> Self {
417        self.worker_concurrency = concurrency;
418        self
419    }
420
421    /// Set worker prefetch multiplier
422    #[must_use]
423    pub const fn with_prefetch_multiplier(mut self, multiplier: usize) -> Self {
424        self.worker_prefetch_multiplier = multiplier;
425        self
426    }
427
428    /// Set default queue name
429    #[inline]
430    #[must_use]
431    pub fn with_default_queue(mut self, queue: impl Into<String>) -> Self {
432        self.task_default_queue = queue.into();
433        self
434    }
435
436    /// Add task route
437    #[inline]
438    #[must_use]
439    pub fn with_task_route(mut self, task: impl Into<String>, route: TaskRoute) -> Self {
440        self.task_routes.insert(task.into(), route);
441        self
442    }
443
444    /// Add task annotation
445    #[inline]
446    #[must_use]
447    pub fn with_task_annotation(mut self, task: impl Into<String>, config: TaskConfig) -> Self {
448        self.task_annotations.insert(task.into(), config);
449        self
450    }
451
452    /// Set result expiration
453    #[must_use]
454    pub const fn with_result_expires(mut self, expires: u64) -> Self {
455        self.result_expires = expires;
456        self
457    }
458
459    /// Enable result compression
460    #[inline]
461    #[must_use]
462    pub fn with_result_compression(mut self, algorithm: impl Into<String>) -> Self {
463        self.result_compression = Some(algorithm.into());
464        self
465    }
466
467    /// Set compression threshold
468    #[must_use]
469    pub const fn with_compression_threshold(mut self, threshold: usize) -> Self {
470        self.result_compression_threshold = threshold;
471        self
472    }
473
474    /// Add beat schedule
475    #[inline]
476    #[must_use]
477    pub fn with_beat_schedule(mut self, name: impl Into<String>, schedule: BeatSchedule) -> Self {
478        self.beat_schedule.insert(name.into(), schedule);
479        self
480    }
481
482    /// Get task configuration for a specific task
483    #[inline]
484    #[must_use]
485    pub fn get_task_config(&self, task_name: &str) -> Option<&TaskConfig> {
486        self.task_annotations.get(task_name)
487    }
488
489    /// Get task route for a specific task
490    #[inline]
491    #[must_use]
492    pub fn get_task_route(&self, task_name: &str) -> Option<&TaskRoute> {
493        self.task_routes.get(task_name)
494    }
495
496    /// Get result expiration duration
497    #[inline]
498    #[must_use]
499    pub const fn result_expires_duration(&self) -> Duration {
500        Duration::from_secs(self.result_expires)
501    }
502
503    /// Get task time limit duration
504    #[inline]
505    #[must_use]
506    pub fn task_time_limit_duration(&self) -> Option<Duration> {
507        self.task_time_limit.map(Duration::from_secs)
508    }
509
510    /// Get task soft time limit duration
511    #[inline]
512    #[must_use]
513    pub fn task_soft_time_limit_duration(&self) -> Option<Duration> {
514        self.task_soft_time_limit.map(Duration::from_secs)
515    }
516
517    /// Load configuration from environment variables
518    ///
519    /// Supports all standard `CELERY_*` and `CELERYD_*` environment variables.
520    /// Boolean values accept: `true`/`false`, `1`/`0`, `yes`/`no`, `on`/`off`.
521    #[must_use]
522    pub fn from_env() -> Self {
523        let mut config = Self::default();
524
525        // String env vars
526        if let Ok(url) = std::env::var("CELERY_BROKER_URL") {
527            config.broker_url = url;
528        }
529        if let Ok(backend) = std::env::var("CELERY_RESULT_BACKEND") {
530            config.result_backend = Some(backend);
531        }
532        if let Ok(serializer) = std::env::var("CELERY_TASK_SERIALIZER") {
533            config.task_serializer = serializer;
534        }
535        if let Ok(serializer) = std::env::var("CELERY_RESULT_SERIALIZER") {
536            config.result_serializer = serializer;
537        }
538        if let Ok(tz) = std::env::var("CELERY_TIMEZONE") {
539            config.timezone = tz;
540        }
541        if let Ok(queue) = std::env::var("CELERY_DEFAULT_QUEUE") {
542            config.task_default_queue = queue;
543        }
544        if let Ok(exchange) = std::env::var("CELERY_DEFAULT_EXCHANGE") {
545            config.task_default_exchange = exchange;
546        }
547        if let Ok(exchange_type) = std::env::var("CELERY_DEFAULT_EXCHANGE_TYPE") {
548            config.task_default_exchange_type = exchange_type;
549        }
550        if let Ok(routing_key) = std::env::var("CELERY_DEFAULT_ROUTING_KEY") {
551            config.task_default_routing_key = routing_key;
552        }
553
554        // Boolean env vars
555        if let Some(val) = parse_env_bool("CELERY_ENABLE_UTC") {
556            config.enable_utc = val;
557        }
558        if let Some(val) = parse_env_bool("CELERY_TASK_TRACK_STARTED") {
559            config.task_track_started = val;
560        }
561        if let Some(val) = parse_env_bool("CELERY_TASK_SEND_SENT_EVENT") {
562            config.task_send_sent_event = val;
563        }
564        if let Some(val) = parse_env_bool("CELERY_TASK_ACKS_LATE") {
565            config.task_acks_late = val;
566        }
567        if let Some(val) = parse_env_bool("CELERY_TASK_REJECT_ON_WORKER_LOST") {
568            config.task_reject_on_worker_lost = val;
569        }
570
571        // Numeric env vars
572        if let Ok(concurrency) = std::env::var("CELERYD_CONCURRENCY") {
573            if let Ok(val) = concurrency.parse() {
574                config.worker_concurrency = val;
575            }
576        }
577        if let Some(val) = parse_env_usize("CELERYD_PREFETCH_MULTIPLIER") {
578            config.worker_prefetch_multiplier = val;
579        }
580        if let Some(val) = parse_env_usize("CELERYD_MAX_TASKS_PER_CHILD") {
581            config.worker_max_tasks_per_child = Some(val);
582        }
583        if let Some(val) = parse_env_usize("CELERYD_MAX_MEMORY_PER_CHILD") {
584            config.worker_max_memory_per_child = Some(val);
585        }
586
587        // Duration / numeric env vars (seconds)
588        if let Some(val) = parse_env_u64("CELERY_TASK_TIME_LIMIT") {
589            config.task_time_limit = Some(val);
590        }
591        if let Some(val) = parse_env_u64("CELERY_TASK_SOFT_TIME_LIMIT") {
592            config.task_soft_time_limit = Some(val);
593        }
594        if let Some(val) = parse_env_u64("CELERY_TASK_DEFAULT_RETRY_DELAY") {
595            config.task_default_retry_delay = val;
596        }
597        if let Some(val) = parse_env_u32("CELERY_TASK_MAX_RETRIES") {
598            config.task_max_retries = val;
599        }
600        if let Some(val) = parse_env_u64("CELERY_RESULT_EXPIRES") {
601            config.result_expires = val;
602        }
603
604        config
605    }
606
607    /// Perform detailed configuration validation, returning structured errors and warnings
608    pub fn validate_detailed(&self) -> ConfigValidation {
609        let mut validation = ConfigValidation::new();
610
611        // Validate broker URL
612        if self.broker_url.is_empty() {
613            validation.add_error(
614                "broker_url",
615                "broker URL is required",
616                Some("set CELERY_BROKER_URL environment variable".to_string()),
617            );
618        } else if !self.broker_url.starts_with("redis://")
619            && !self.broker_url.starts_with("rediss://")
620            && !self.broker_url.starts_with("amqp://")
621            && !self.broker_url.starts_with("amqps://")
622            && !self.broker_url.starts_with("sqs://")
623            && !self.broker_url.starts_with("postgres://")
624            && !self.broker_url.starts_with("postgresql://")
625            && !self.broker_url.starts_with("mysql://")
626        {
627            validation.add_error(
628                "broker_url",
629                format!("unrecognized broker URL scheme: {}", self.broker_url),
630                Some("use redis://, amqp://, sqs://, postgres://, or mysql://".to_string()),
631            );
632        }
633
634        // Validate result backend URL
635        if let Some(ref url) = self.result_backend {
636            if !url.starts_with("redis://")
637                && !url.starts_with("rediss://")
638                && !url.starts_with("postgres://")
639                && !url.starts_with("postgresql://")
640                && !url.starts_with("mysql://")
641                && !url.starts_with("grpc://")
642            {
643                validation.add_error(
644                    "result_backend",
645                    format!("unrecognized result backend URL scheme: {}", url),
646                    Some("use redis://, postgres://, mysql://, or grpc://".to_string()),
647                );
648            }
649        }
650
651        // Validate serializers
652        let valid_serializers = ["json", "msgpack", "yaml", "pickle", "bson", "protobuf"];
653        if !valid_serializers.contains(&self.task_serializer.as_str()) {
654            validation.add_error(
655                "task_serializer",
656                format!("unknown serializer: {}", self.task_serializer),
657                Some(format!("use one of: {}", valid_serializers.join(", "))),
658            );
659        }
660        if !valid_serializers.contains(&self.result_serializer.as_str()) {
661            validation.add_error(
662                "result_serializer",
663                format!("unknown serializer: {}", self.result_serializer),
664                Some(format!("use one of: {}", valid_serializers.join(", "))),
665            );
666        }
667
668        // Validate concurrency
669        if self.worker_concurrency == 0 {
670            validation.add_error(
671                "worker_concurrency",
672                "concurrency must be at least 1",
673                Some("set CELERYD_CONCURRENCY to a positive integer".to_string()),
674            );
675        }
676        if self.worker_concurrency > 1024 {
677            validation.add_warning(
678                "worker_concurrency",
679                format!(
680                    "high concurrency value ({}), may cause resource exhaustion",
681                    self.worker_concurrency
682                ),
683            );
684        }
685
686        // Validate time limits
687        if let (Some(hard), Some(soft)) = (self.task_time_limit, self.task_soft_time_limit) {
688            if soft >= hard {
689                validation.add_warning(
690                    "task_soft_time_limit",
691                    "soft time limit should be less than hard time limit",
692                );
693            }
694        }
695
696        // Validate prefetch
697        if self.worker_prefetch_multiplier == 0 {
698            validation.add_warning(
699                "worker_prefetch_multiplier",
700                "prefetch multiplier of 0 disables prefetching, consider setting to 1",
701            );
702        }
703
704        // Validate retry settings
705        if self.task_max_retries > 100 {
706            validation.add_warning(
707                "task_max_retries",
708                format!(
709                    "high max retries ({}), may cause infinite retry loops",
710                    self.task_max_retries
711                ),
712            );
713        }
714
715        validation
716    }
717
718    /// Export configuration as environment variable key-value pairs
719    pub fn to_env_vars(&self) -> Vec<(String, String)> {
720        let mut vars = Vec::new();
721
722        vars.push(("CELERY_BROKER_URL".to_string(), self.broker_url.clone()));
723        if let Some(ref backend) = self.result_backend {
724            vars.push(("CELERY_RESULT_BACKEND".to_string(), backend.clone()));
725        }
726        vars.push((
727            "CELERY_TASK_SERIALIZER".to_string(),
728            self.task_serializer.clone(),
729        ));
730        vars.push((
731            "CELERY_RESULT_SERIALIZER".to_string(),
732            self.result_serializer.clone(),
733        ));
734        vars.push(("CELERY_TIMEZONE".to_string(), self.timezone.clone()));
735        vars.push(("CELERY_ENABLE_UTC".to_string(), self.enable_utc.to_string()));
736        vars.push((
737            "CELERY_TASK_TRACK_STARTED".to_string(),
738            self.task_track_started.to_string(),
739        ));
740        vars.push((
741            "CELERY_TASK_SEND_SENT_EVENT".to_string(),
742            self.task_send_sent_event.to_string(),
743        ));
744        vars.push((
745            "CELERY_TASK_ACKS_LATE".to_string(),
746            self.task_acks_late.to_string(),
747        ));
748        vars.push((
749            "CELERY_TASK_REJECT_ON_WORKER_LOST".to_string(),
750            self.task_reject_on_worker_lost.to_string(),
751        ));
752        vars.push((
753            "CELERYD_CONCURRENCY".to_string(),
754            self.worker_concurrency.to_string(),
755        ));
756        vars.push((
757            "CELERYD_PREFETCH_MULTIPLIER".to_string(),
758            self.worker_prefetch_multiplier.to_string(),
759        ));
760        if let Some(val) = self.worker_max_tasks_per_child {
761            vars.push(("CELERYD_MAX_TASKS_PER_CHILD".to_string(), val.to_string()));
762        }
763        if let Some(val) = self.worker_max_memory_per_child {
764            vars.push(("CELERYD_MAX_MEMORY_PER_CHILD".to_string(), val.to_string()));
765        }
766        vars.push((
767            "CELERY_DEFAULT_QUEUE".to_string(),
768            self.task_default_queue.clone(),
769        ));
770        vars.push((
771            "CELERY_DEFAULT_EXCHANGE".to_string(),
772            self.task_default_exchange.clone(),
773        ));
774        vars.push((
775            "CELERY_DEFAULT_EXCHANGE_TYPE".to_string(),
776            self.task_default_exchange_type.clone(),
777        ));
778        vars.push((
779            "CELERY_DEFAULT_ROUTING_KEY".to_string(),
780            self.task_default_routing_key.clone(),
781        ));
782        if let Some(val) = self.task_time_limit {
783            vars.push(("CELERY_TASK_TIME_LIMIT".to_string(), val.to_string()));
784        }
785        if let Some(val) = self.task_soft_time_limit {
786            vars.push(("CELERY_TASK_SOFT_TIME_LIMIT".to_string(), val.to_string()));
787        }
788        vars.push((
789            "CELERY_TASK_DEFAULT_RETRY_DELAY".to_string(),
790            self.task_default_retry_delay.to_string(),
791        ));
792        vars.push((
793            "CELERY_TASK_MAX_RETRIES".to_string(),
794            self.task_max_retries.to_string(),
795        ));
796        vars.push((
797            "CELERY_RESULT_EXPIRES".to_string(),
798            self.result_expires.to_string(),
799        ));
800
801        vars
802    }
803
804    /// Dump configuration as a formatted debug string
805    pub fn dump(&self) -> String {
806        let mut output = String::from("CeleRS Configuration:\n");
807        output.push_str(&format!("  broker_url: {}\n", self.broker_url));
808        output.push_str(&format!("  result_backend: {:?}\n", self.result_backend));
809        output.push_str(&format!("  task_serializer: {}\n", self.task_serializer));
810        output.push_str(&format!(
811            "  result_serializer: {}\n",
812            self.result_serializer
813        ));
814        output.push_str(&format!("  timezone: {}\n", self.timezone));
815        output.push_str(&format!("  enable_utc: {}\n", self.enable_utc));
816        output.push_str(&format!(
817            "  task_track_started: {}\n",
818            self.task_track_started
819        ));
820        output.push_str(&format!(
821            "  task_send_sent_event: {}\n",
822            self.task_send_sent_event
823        ));
824        output.push_str(&format!("  task_acks_late: {}\n", self.task_acks_late));
825        output.push_str(&format!(
826            "  task_reject_on_worker_lost: {}\n",
827            self.task_reject_on_worker_lost
828        ));
829        output.push_str(&format!(
830            "  worker_concurrency: {}\n",
831            self.worker_concurrency
832        ));
833        output.push_str(&format!(
834            "  worker_prefetch_multiplier: {}\n",
835            self.worker_prefetch_multiplier
836        ));
837        output.push_str(&format!(
838            "  worker_max_tasks_per_child: {:?}\n",
839            self.worker_max_tasks_per_child
840        ));
841        output.push_str(&format!(
842            "  worker_max_memory_per_child: {:?}\n",
843            self.worker_max_memory_per_child
844        ));
845        output.push_str(&format!("  worker_heartbeat: {}s\n", self.worker_heartbeat));
846        output.push_str(&format!(
847            "  task_default_queue: {}\n",
848            self.task_default_queue
849        ));
850        output.push_str(&format!(
851            "  task_default_exchange: {}\n",
852            self.task_default_exchange
853        ));
854        output.push_str(&format!(
855            "  task_default_exchange_type: {}\n",
856            self.task_default_exchange_type
857        ));
858        output.push_str(&format!(
859            "  task_default_routing_key: {}\n",
860            self.task_default_routing_key
861        ));
862        output.push_str(&format!("  task_time_limit: {:?}\n", self.task_time_limit));
863        output.push_str(&format!(
864            "  task_soft_time_limit: {:?}\n",
865            self.task_soft_time_limit
866        ));
867        output.push_str(&format!(
868            "  task_default_retry_delay: {}s\n",
869            self.task_default_retry_delay
870        ));
871        output.push_str(&format!("  task_max_retries: {}\n", self.task_max_retries));
872        output.push_str(&format!("  result_expires: {}s\n", self.result_expires));
873        output.push_str(&format!(
874            "  result_compression: {:?}\n",
875            self.result_compression
876        ));
877        output.push_str(&format!(
878            "  result_compression_threshold: {} bytes\n",
879            self.result_compression_threshold
880        ));
881        output.push_str(&format!(
882            "  task_routes: {} route(s)\n",
883            self.task_routes.len()
884        ));
885        output.push_str(&format!(
886            "  task_annotations: {} annotation(s)\n",
887            self.task_annotations.len()
888        ));
889        output.push_str(&format!(
890            "  beat_schedule: {} schedule(s)\n",
891            self.beat_schedule.len()
892        ));
893        output
894    }
895
896    /// Validate configuration
897    ///
898    /// # Errors
899    ///
900    /// Returns an error if the configuration is invalid (e.g., empty broker URL, invalid concurrency, unsupported serializer).
901    pub fn validate(&self) -> Result<(), String> {
902        if self.broker_url.is_empty() {
903            return Err("broker_url is required".to_string());
904        }
905
906        if self.worker_concurrency == 0 {
907            return Err("worker_concurrency must be greater than 0".to_string());
908        }
909
910        if !["json", "msgpack", "yaml", "pickle"].contains(&self.task_serializer.as_str()) {
911            return Err(format!(
912                "Unsupported task_serializer: {}",
913                self.task_serializer
914            ));
915        }
916
917        Ok(())
918    }
919}
920
921// Default value functions
922fn default_serializer() -> String {
923    "json".to_string()
924}
925
926fn default_accept_content() -> Vec<String> {
927    vec!["json".to_string(), "msgpack".to_string()]
928}
929
930fn default_timezone() -> String {
931    "UTC".to_string()
932}
933
934fn default_true() -> bool {
935    true
936}
937
938fn default_concurrency() -> usize {
939    num_cpus::get()
940}
941
942fn default_prefetch_multiplier() -> usize {
943    4
944}
945
946fn default_heartbeat_interval() -> u64 {
947    10
948}
949
950fn default_queue_name() -> String {
951    "celery".to_string()
952}
953
954fn default_exchange_type() -> String {
955    "direct".to_string()
956}
957
958fn default_retry_delay() -> u64 {
959    180 // 3 minutes
960}
961
962fn default_max_retries() -> u32 {
963    3
964}
965
966fn default_result_expires() -> u64 {
967    86400 // 24 hours
968}
969
970fn default_compression_threshold() -> usize {
971    1024 * 1024 // 1MB
972}
973
974// --- Environment variable parsing helpers ---
975
976/// Parse an environment variable as a boolean.
977///
978/// Accepts: `true`/`false`, `1`/`0`, `yes`/`no`, `on`/`off` (case-insensitive).
979fn parse_env_bool(var: &str) -> Option<bool> {
980    std::env::var(var)
981        .ok()
982        .and_then(|v| match v.to_lowercase().as_str() {
983            "true" | "1" | "yes" | "on" => Some(true),
984            "false" | "0" | "no" | "off" => Some(false),
985            _ => None,
986        })
987}
988
989fn parse_env_u64(var: &str) -> Option<u64> {
990    std::env::var(var).ok().and_then(|v| v.parse().ok())
991}
992
993fn parse_env_u32(var: &str) -> Option<u32> {
994    std::env::var(var).ok().and_then(|v| v.parse().ok())
995}
996
997fn parse_env_usize(var: &str) -> Option<usize> {
998    std::env::var(var).ok().and_then(|v| v.parse().ok())
999}
1000
1001// --- Detailed configuration validation types ---
1002
1003/// Detailed configuration validation result containing structured errors and warnings
1004#[derive(Debug, Clone, Default)]
1005pub struct ConfigValidation {
1006    /// Configuration errors that must be fixed
1007    pub errors: Vec<ConfigError>,
1008    /// Configuration warnings that may indicate suboptimal settings
1009    pub warnings: Vec<ConfigWarning>,
1010}
1011
1012impl ConfigValidation {
1013    /// Create an empty validation result
1014    pub fn new() -> Self {
1015        Self::default()
1016    }
1017
1018    /// Returns `true` if there are no errors
1019    pub fn is_valid(&self) -> bool {
1020        self.errors.is_empty()
1021    }
1022
1023    /// Returns `true` if there are any warnings
1024    pub fn has_warnings(&self) -> bool {
1025        !self.warnings.is_empty()
1026    }
1027
1028    /// Number of errors
1029    pub fn error_count(&self) -> usize {
1030        self.errors.len()
1031    }
1032
1033    /// Number of warnings
1034    pub fn warning_count(&self) -> usize {
1035        self.warnings.len()
1036    }
1037
1038    fn add_error(
1039        &mut self,
1040        field: impl Into<String>,
1041        message: impl Into<String>,
1042        suggestion: Option<String>,
1043    ) {
1044        self.errors.push(ConfigError {
1045            field: field.into(),
1046            message: message.into(),
1047            suggestion,
1048        });
1049    }
1050
1051    fn add_warning(&mut self, field: impl Into<String>, message: impl Into<String>) {
1052        self.warnings.push(ConfigWarning {
1053            field: field.into(),
1054            message: message.into(),
1055        });
1056    }
1057}
1058
1059/// A configuration error with optional suggestion for fixing it
1060#[derive(Debug, Clone)]
1061pub struct ConfigError {
1062    /// The configuration field name
1063    pub field: String,
1064    /// Human-readable error message
1065    pub message: String,
1066    /// Optional suggestion for fixing the error
1067    pub suggestion: Option<String>,
1068}
1069
1070impl std::fmt::Display for ConfigError {
1071    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1072        write!(f, "[{}] {}", self.field, self.message)?;
1073        if let Some(ref suggestion) = self.suggestion {
1074            write!(f, " (suggestion: {})", suggestion)?;
1075        }
1076        Ok(())
1077    }
1078}
1079
1080/// A configuration warning indicating a potentially suboptimal setting
1081#[derive(Debug, Clone)]
1082pub struct ConfigWarning {
1083    /// The configuration field name
1084    pub field: String,
1085    /// Human-readable warning message
1086    pub message: String,
1087}
1088
1089impl std::fmt::Display for ConfigWarning {
1090    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1091        write!(f, "[{}] {}", self.field, self.message)
1092    }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097    use super::*;
1098    use std::sync::Mutex;
1099
1100    // Mutex to serialize env-var-mutating tests (env vars are process-global)
1101    static ENV_LOCK: Mutex<()> = Mutex::new(());
1102
1103    /// Helper to set env vars safely in tests.
1104    /// SAFETY: These are test-only calls. We hold ENV_LOCK to prevent races.
1105    fn set_env(key: &str, val: &str) {
1106        unsafe { std::env::set_var(key, val) };
1107    }
1108
1109    /// Helper to remove env vars safely in tests.
1110    fn remove_env(key: &str) {
1111        unsafe { std::env::remove_var(key) };
1112    }
1113
1114    /// List of all CELERY_* env var keys used by from_env(), for cleanup.
1115    const ALL_ENV_KEYS: &[&str] = &[
1116        "CELERY_BROKER_URL",
1117        "CELERY_RESULT_BACKEND",
1118        "CELERY_TASK_SERIALIZER",
1119        "CELERY_RESULT_SERIALIZER",
1120        "CELERY_TIMEZONE",
1121        "CELERY_DEFAULT_QUEUE",
1122        "CELERY_DEFAULT_EXCHANGE",
1123        "CELERY_DEFAULT_EXCHANGE_TYPE",
1124        "CELERY_DEFAULT_ROUTING_KEY",
1125        "CELERY_ENABLE_UTC",
1126        "CELERY_TASK_TRACK_STARTED",
1127        "CELERY_TASK_SEND_SENT_EVENT",
1128        "CELERY_TASK_ACKS_LATE",
1129        "CELERY_TASK_REJECT_ON_WORKER_LOST",
1130        "CELERYD_CONCURRENCY",
1131        "CELERYD_PREFETCH_MULTIPLIER",
1132        "CELERYD_MAX_TASKS_PER_CHILD",
1133        "CELERYD_MAX_MEMORY_PER_CHILD",
1134        "CELERY_TASK_TIME_LIMIT",
1135        "CELERY_TASK_SOFT_TIME_LIMIT",
1136        "CELERY_TASK_DEFAULT_RETRY_DELAY",
1137        "CELERY_TASK_MAX_RETRIES",
1138        "CELERY_RESULT_EXPIRES",
1139    ];
1140
1141    fn cleanup_env() {
1142        for key in ALL_ENV_KEYS {
1143            remove_env(key);
1144        }
1145    }
1146
1147    #[test]
1148    fn test_default_config() {
1149        let config = CeleryConfig::default();
1150        assert_eq!(config.broker_url, "redis://localhost:6379/0");
1151        assert_eq!(config.task_serializer, "json");
1152        assert_eq!(config.timezone, "UTC");
1153        assert!(config.enable_utc);
1154    }
1155
1156    #[test]
1157    fn test_config_builder() {
1158        let config = CeleryConfig::new("redis://localhost:6379/0")
1159            .with_result_backend("redis://localhost:6379/1")
1160            .with_worker_concurrency(8)
1161            .with_default_queue("my_queue");
1162
1163        assert_eq!(config.worker_concurrency, 8);
1164        assert_eq!(config.task_default_queue, "my_queue");
1165    }
1166
1167    #[test]
1168    fn test_config_validation() {
1169        let config = CeleryConfig::default();
1170        assert!(config.validate().is_ok());
1171
1172        let invalid = CeleryConfig {
1173            broker_url: String::new(),
1174            ..Default::default()
1175        };
1176        assert!(invalid.validate().is_err());
1177    }
1178
1179    #[test]
1180    fn test_task_route() {
1181        let route = TaskRoute {
1182            queue: "high_priority".to_string(),
1183            exchange: Some("tasks".to_string()),
1184            routing_key: Some("task.high".to_string()),
1185            priority: Some(9),
1186        };
1187
1188        let config = CeleryConfig::default().with_task_route("important_task", route);
1189
1190        assert!(config.get_task_route("important_task").is_some());
1191    }
1192
1193    #[test]
1194    fn test_duration_conversions() {
1195        let config = CeleryConfig::default();
1196        assert_eq!(config.result_expires_duration(), Duration::from_secs(86400));
1197    }
1198
1199    #[test]
1200    fn test_from_env_boolean_vars() {
1201        let _guard = ENV_LOCK.lock();
1202        cleanup_env();
1203
1204        set_env("CELERY_ENABLE_UTC", "true");
1205        set_env("CELERY_TASK_TRACK_STARTED", "1");
1206        set_env("CELERY_TASK_SEND_SENT_EVENT", "yes");
1207        set_env("CELERY_TASK_ACKS_LATE", "on");
1208        set_env("CELERY_TASK_REJECT_ON_WORKER_LOST", "false");
1209
1210        let config = CeleryConfig::from_env();
1211        assert!(config.enable_utc);
1212        assert!(config.task_track_started);
1213        assert!(config.task_send_sent_event);
1214        assert!(config.task_acks_late);
1215        assert!(!config.task_reject_on_worker_lost);
1216
1217        cleanup_env();
1218    }
1219
1220    #[test]
1221    fn test_from_env_numeric_vars() {
1222        let _guard = ENV_LOCK.lock();
1223        cleanup_env();
1224
1225        set_env("CELERYD_PREFETCH_MULTIPLIER", "8");
1226        set_env("CELERYD_CONCURRENCY", "16");
1227        set_env("CELERYD_MAX_TASKS_PER_CHILD", "1000");
1228        set_env("CELERYD_MAX_MEMORY_PER_CHILD", "524288");
1229
1230        let config = CeleryConfig::from_env();
1231        assert_eq!(config.worker_prefetch_multiplier, 8);
1232        assert_eq!(config.worker_concurrency, 16);
1233        assert_eq!(config.worker_max_tasks_per_child, Some(1000));
1234        assert_eq!(config.worker_max_memory_per_child, Some(524288));
1235
1236        cleanup_env();
1237    }
1238
1239    #[test]
1240    fn test_from_env_string_vars() {
1241        let _guard = ENV_LOCK.lock();
1242        cleanup_env();
1243
1244        set_env("CELERY_DEFAULT_QUEUE", "myqueue");
1245        set_env("CELERY_DEFAULT_EXCHANGE", "myexchange");
1246        set_env("CELERY_DEFAULT_EXCHANGE_TYPE", "topic");
1247        set_env("CELERY_DEFAULT_ROUTING_KEY", "task.default");
1248        set_env("CELERY_RESULT_SERIALIZER", "msgpack");
1249
1250        let config = CeleryConfig::from_env();
1251        assert_eq!(config.task_default_queue, "myqueue");
1252        assert_eq!(config.task_default_exchange, "myexchange");
1253        assert_eq!(config.task_default_exchange_type, "topic");
1254        assert_eq!(config.task_default_routing_key, "task.default");
1255        assert_eq!(config.result_serializer, "msgpack");
1256
1257        cleanup_env();
1258    }
1259
1260    #[test]
1261    fn test_from_env_duration_vars() {
1262        let _guard = ENV_LOCK.lock();
1263        cleanup_env();
1264
1265        set_env("CELERY_TASK_TIME_LIMIT", "300");
1266        set_env("CELERY_TASK_SOFT_TIME_LIMIT", "240");
1267        set_env("CELERY_TASK_DEFAULT_RETRY_DELAY", "60");
1268        set_env("CELERY_TASK_MAX_RETRIES", "5");
1269        set_env("CELERY_RESULT_EXPIRES", "3600");
1270
1271        let config = CeleryConfig::from_env();
1272        assert_eq!(config.task_time_limit, Some(300));
1273        assert_eq!(config.task_soft_time_limit, Some(240));
1274        assert_eq!(config.task_default_retry_delay, 60);
1275        assert_eq!(config.task_max_retries, 5);
1276        assert_eq!(config.result_expires, 3600);
1277
1278        cleanup_env();
1279    }
1280
1281    #[test]
1282    fn test_parse_env_bool_variants() {
1283        let _guard = ENV_LOCK.lock();
1284        cleanup_env();
1285
1286        // Truthy values
1287        for val in &["true", "TRUE", "True", "1", "yes", "YES", "on", "ON"] {
1288            set_env("CELERY_ENABLE_UTC", val);
1289            assert_eq!(
1290                parse_env_bool("CELERY_ENABLE_UTC"),
1291                Some(true),
1292                "failed for {}",
1293                val
1294            );
1295        }
1296
1297        // Falsy values
1298        for val in &["false", "FALSE", "False", "0", "no", "NO", "off", "OFF"] {
1299            set_env("CELERY_ENABLE_UTC", val);
1300            assert_eq!(
1301                parse_env_bool("CELERY_ENABLE_UTC"),
1302                Some(false),
1303                "failed for {}",
1304                val
1305            );
1306        }
1307
1308        // Invalid values return None
1309        set_env("CELERY_ENABLE_UTC", "maybe");
1310        assert_eq!(parse_env_bool("CELERY_ENABLE_UTC"), None);
1311
1312        // Missing var returns None
1313        remove_env("CELERY_ENABLE_UTC");
1314        assert_eq!(parse_env_bool("CELERY_ENABLE_UTC"), None);
1315
1316        cleanup_env();
1317    }
1318
1319    #[test]
1320    fn test_validate_detailed_valid_config() {
1321        let config = CeleryConfig::default();
1322        let validation = config.validate_detailed();
1323        assert!(validation.is_valid());
1324        assert_eq!(validation.error_count(), 0);
1325    }
1326
1327    #[test]
1328    fn test_validate_detailed_invalid_broker_url() {
1329        let config = CeleryConfig {
1330            broker_url: "ftp://bad-scheme".to_string(),
1331            ..Default::default()
1332        };
1333        let validation = config.validate_detailed();
1334        assert!(!validation.is_valid());
1335        assert!(validation.errors.iter().any(|e| e.field == "broker_url"));
1336    }
1337
1338    #[test]
1339    fn test_validate_detailed_invalid_serializer() {
1340        let config = CeleryConfig {
1341            task_serializer: "xml".to_string(),
1342            ..Default::default()
1343        };
1344        let validation = config.validate_detailed();
1345        assert!(!validation.is_valid());
1346        assert!(validation
1347            .errors
1348            .iter()
1349            .any(|e| e.field == "task_serializer"));
1350    }
1351
1352    #[test]
1353    fn test_validate_detailed_zero_concurrency() {
1354        let config = CeleryConfig {
1355            worker_concurrency: 0,
1356            ..Default::default()
1357        };
1358        let validation = config.validate_detailed();
1359        assert!(!validation.is_valid());
1360        assert!(validation
1361            .errors
1362            .iter()
1363            .any(|e| e.field == "worker_concurrency"));
1364    }
1365
1366    #[test]
1367    fn test_validate_detailed_time_limit_warning() {
1368        let config = CeleryConfig {
1369            task_time_limit: Some(60),
1370            task_soft_time_limit: Some(120), // soft >= hard
1371            ..Default::default()
1372        };
1373        let validation = config.validate_detailed();
1374        assert!(validation.has_warnings());
1375        assert!(validation
1376            .warnings
1377            .iter()
1378            .any(|w| w.field == "task_soft_time_limit"));
1379    }
1380
1381    #[test]
1382    fn test_to_env_vars_roundtrip() {
1383        let config = CeleryConfig::new("amqp://localhost:5672")
1384            .with_result_backend("redis://localhost:6379/1")
1385            .with_task_serializer("msgpack")
1386            .with_result_serializer("json")
1387            .with_timezone("US/Eastern")
1388            .with_enable_utc(false)
1389            .with_worker_concurrency(12)
1390            .with_prefetch_multiplier(2)
1391            .with_default_queue("tasks");
1392
1393        let vars = config.to_env_vars();
1394
1395        // Check that key env vars are present with correct values
1396        let find_var = |key: &str| -> Option<String> {
1397            vars.iter().find(|(k, _)| k == key).map(|(_, v)| v.clone())
1398        };
1399
1400        assert_eq!(
1401            find_var("CELERY_BROKER_URL").as_deref(),
1402            Some("amqp://localhost:5672")
1403        );
1404        assert_eq!(
1405            find_var("CELERY_RESULT_BACKEND").as_deref(),
1406            Some("redis://localhost:6379/1")
1407        );
1408        assert_eq!(
1409            find_var("CELERY_TASK_SERIALIZER").as_deref(),
1410            Some("msgpack")
1411        );
1412        assert_eq!(
1413            find_var("CELERY_RESULT_SERIALIZER").as_deref(),
1414            Some("json")
1415        );
1416        assert_eq!(find_var("CELERY_TIMEZONE").as_deref(), Some("US/Eastern"));
1417        assert_eq!(find_var("CELERY_ENABLE_UTC").as_deref(), Some("false"));
1418        assert_eq!(find_var("CELERYD_CONCURRENCY").as_deref(), Some("12"));
1419        assert_eq!(
1420            find_var("CELERYD_PREFETCH_MULTIPLIER").as_deref(),
1421            Some("2")
1422        );
1423        assert_eq!(find_var("CELERY_DEFAULT_QUEUE").as_deref(), Some("tasks"));
1424    }
1425
1426    #[test]
1427    fn test_dump_output() {
1428        let config = CeleryConfig::default();
1429        let output = config.dump();
1430
1431        assert!(output.starts_with("CeleRS Configuration:\n"));
1432        assert!(output.contains("broker_url:"));
1433        assert!(output.contains("task_serializer:"));
1434        assert!(output.contains("worker_concurrency:"));
1435        assert!(output.contains("result_expires:"));
1436        assert!(output.contains("task_routes:"));
1437        assert!(output.contains("beat_schedule:"));
1438    }
1439
1440    #[test]
1441    fn test_config_validation_display() {
1442        let error = ConfigError {
1443            field: "broker_url".to_string(),
1444            message: "invalid URL".to_string(),
1445            suggestion: Some("use redis://".to_string()),
1446        };
1447        let display = format!("{}", error);
1448        assert!(display.contains("[broker_url]"));
1449        assert!(display.contains("invalid URL"));
1450        assert!(display.contains("suggestion: use redis://"));
1451
1452        let error_no_suggestion = ConfigError {
1453            field: "concurrency".to_string(),
1454            message: "must be positive".to_string(),
1455            suggestion: None,
1456        };
1457        let display2 = format!("{}", error_no_suggestion);
1458        assert!(display2.contains("[concurrency]"));
1459        assert!(display2.contains("must be positive"));
1460        assert!(!display2.contains("suggestion"));
1461
1462        let warning = ConfigWarning {
1463            field: "prefetch".to_string(),
1464            message: "value too high".to_string(),
1465        };
1466        let display3 = format!("{}", warning);
1467        assert!(display3.contains("[prefetch]"));
1468        assert!(display3.contains("value too high"));
1469    }
1470
1471    #[test]
1472    fn test_validate_detailed_high_concurrency_warning() {
1473        let config = CeleryConfig {
1474            worker_concurrency: 2048,
1475            ..Default::default()
1476        };
1477        let validation = config.validate_detailed();
1478        assert!(validation.has_warnings());
1479        assert!(validation
1480            .warnings
1481            .iter()
1482            .any(|w| w.field == "worker_concurrency"));
1483    }
1484
1485    #[test]
1486    fn test_validate_detailed_prefetch_zero_warning() {
1487        let config = CeleryConfig {
1488            worker_prefetch_multiplier: 0,
1489            ..Default::default()
1490        };
1491        let validation = config.validate_detailed();
1492        assert!(validation.has_warnings());
1493        assert!(validation
1494            .warnings
1495            .iter()
1496            .any(|w| w.field == "worker_prefetch_multiplier"));
1497    }
1498
1499    #[test]
1500    fn test_validate_detailed_high_retries_warning() {
1501        let config = CeleryConfig {
1502            task_max_retries: 200,
1503            ..Default::default()
1504        };
1505        let validation = config.validate_detailed();
1506        assert!(validation.has_warnings());
1507        assert!(validation
1508            .warnings
1509            .iter()
1510            .any(|w| w.field == "task_max_retries"));
1511    }
1512
1513    #[test]
1514    fn test_validate_detailed_invalid_result_backend() {
1515        let config = CeleryConfig {
1516            result_backend: Some("ftp://invalid".to_string()),
1517            ..Default::default()
1518        };
1519        let validation = config.validate_detailed();
1520        assert!(!validation.is_valid());
1521        assert!(validation
1522            .errors
1523            .iter()
1524            .any(|e| e.field == "result_backend"));
1525    }
1526
1527    #[test]
1528    fn test_validate_detailed_invalid_result_serializer() {
1529        let config = CeleryConfig {
1530            result_serializer: "xml".to_string(),
1531            ..Default::default()
1532        };
1533        let validation = config.validate_detailed();
1534        assert!(!validation.is_valid());
1535        assert!(validation
1536            .errors
1537            .iter()
1538            .any(|e| e.field == "result_serializer"));
1539    }
1540
1541    #[test]
1542    fn test_config_validation_counts() {
1543        let mut validation = ConfigValidation::new();
1544        assert!(validation.is_valid());
1545        assert!(!validation.has_warnings());
1546        assert_eq!(validation.error_count(), 0);
1547        assert_eq!(validation.warning_count(), 0);
1548
1549        validation.add_error("f1", "e1", None);
1550        validation.add_warning("f2", "w1");
1551        assert!(!validation.is_valid());
1552        assert!(validation.has_warnings());
1553        assert_eq!(validation.error_count(), 1);
1554        assert_eq!(validation.warning_count(), 1);
1555    }
1556}