1use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::time::Duration;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct CeleryConfig {
31 pub broker_url: String,
33
34 #[serde(skip_serializing_if = "Option::is_none")]
36 pub result_backend: Option<String>,
37
38 #[serde(default = "default_serializer")]
40 pub task_serializer: String,
41
42 #[serde(default = "default_serializer")]
44 pub result_serializer: String,
45
46 #[serde(default = "default_accept_content")]
48 pub accept_content: Vec<String>,
49
50 #[serde(default = "default_timezone")]
52 pub timezone: String,
53
54 #[serde(default = "default_true")]
56 pub enable_utc: bool,
57
58 #[serde(default)]
60 pub task_track_started: bool,
61
62 #[serde(default)]
64 pub task_send_sent_event: bool,
65
66 #[serde(default)]
68 pub task_acks_late: bool,
69
70 #[serde(default)]
72 pub task_reject_on_worker_lost: bool,
73
74 #[serde(default = "default_concurrency")]
76 pub worker_concurrency: usize,
77
78 #[serde(default = "default_prefetch_multiplier")]
80 pub worker_prefetch_multiplier: usize,
81
82 #[serde(skip_serializing_if = "Option::is_none")]
84 pub worker_max_tasks_per_child: Option<usize>,
85
86 #[serde(skip_serializing_if = "Option::is_none")]
88 pub worker_max_memory_per_child: Option<usize>,
89
90 #[serde(default = "default_heartbeat_interval")]
92 pub worker_heartbeat: u64,
93
94 #[serde(default = "default_queue_name")]
96 pub task_default_queue: String,
97
98 #[serde(default = "default_queue_name")]
100 pub task_default_exchange: String,
101
102 #[serde(default = "default_exchange_type")]
104 pub task_default_exchange_type: String,
105
106 #[serde(default = "default_queue_name")]
108 pub task_default_routing_key: String,
109
110 #[serde(default)]
112 pub task_routes: HashMap<String, TaskRoute>,
113
114 #[serde(skip_serializing_if = "Option::is_none")]
116 pub task_time_limit: Option<u64>,
117
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub task_soft_time_limit: Option<u64>,
121
122 #[serde(default = "default_retry_delay")]
124 pub task_default_retry_delay: u64,
125
126 #[serde(default = "default_max_retries")]
128 pub task_max_retries: u32,
129
130 #[serde(default = "default_result_expires")]
132 pub result_expires: u64,
133
134 #[serde(skip_serializing_if = "Option::is_none")]
136 pub result_compression: Option<String>,
137
138 #[serde(default = "default_compression_threshold")]
140 pub result_compression_threshold: usize,
141
142 #[serde(default)]
144 pub task_annotations: HashMap<String, TaskConfig>,
145
146 #[serde(default)]
148 pub broker_transport_options: BrokerTransport,
149
150 #[serde(default)]
152 pub result_backend_transport_options: BackendTransport,
153
154 #[serde(default)]
156 pub beat_schedule: HashMap<String, BeatSchedule>,
157
158 #[serde(flatten)]
160 pub custom: HashMap<String, serde_json::Value>,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct TaskRoute {
166 pub queue: String,
168
169 #[serde(skip_serializing_if = "Option::is_none")]
171 pub exchange: Option<String>,
172
173 #[serde(skip_serializing_if = "Option::is_none")]
175 pub routing_key: Option<String>,
176
177 #[serde(skip_serializing_if = "Option::is_none")]
179 pub priority: Option<u8>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct TaskConfig {
185 #[serde(skip_serializing_if = "Option::is_none")]
187 pub time_limit: Option<u64>,
188
189 #[serde(skip_serializing_if = "Option::is_none")]
191 pub soft_time_limit: Option<u64>,
192
193 #[serde(skip_serializing_if = "Option::is_none")]
195 pub max_retries: Option<u32>,
196
197 #[serde(skip_serializing_if = "Option::is_none")]
199 pub default_retry_delay: Option<u64>,
200
201 #[serde(skip_serializing_if = "Option::is_none")]
203 pub priority: Option<u8>,
204
205 #[serde(skip_serializing_if = "Option::is_none")]
207 pub queue: Option<String>,
208
209 #[serde(skip_serializing_if = "Option::is_none")]
211 pub acks_late: Option<bool>,
212
213 #[serde(skip_serializing_if = "Option::is_none")]
215 pub track_started: Option<bool>,
216
217 #[serde(skip_serializing_if = "Option::is_none")]
219 pub rate_limit: Option<String>,
220}
221
222#[derive(Debug, Clone, Default, Serialize, Deserialize)]
224pub struct BrokerTransport {
225 #[serde(skip_serializing_if = "Option::is_none")]
227 pub visibility_timeout: Option<u64>,
228
229 #[serde(skip_serializing_if = "Option::is_none")]
231 pub max_connections: Option<usize>,
232
233 #[serde(skip_serializing_if = "Option::is_none")]
235 pub max_retries: Option<u32>,
236
237 #[serde(skip_serializing_if = "Option::is_none")]
239 pub interval_start: Option<u64>,
240
241 #[serde(skip_serializing_if = "Option::is_none")]
243 pub interval_max: Option<u64>,
244
245 #[serde(flatten)]
247 pub custom: HashMap<String, serde_json::Value>,
248}
249
250#[derive(Debug, Clone, Default, Serialize, Deserialize)]
252pub struct BackendTransport {
253 #[serde(skip_serializing_if = "Option::is_none")]
255 pub result_expires: Option<u64>,
256
257 #[serde(skip_serializing_if = "Option::is_none")]
259 pub max_connections: Option<usize>,
260
261 #[serde(flatten)]
263 pub custom: HashMap<String, serde_json::Value>,
264}
265
266#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct BeatSchedule {
269 pub task: String,
271
272 pub schedule: ScheduleDefinition,
274
275 #[serde(skip_serializing_if = "Option::is_none")]
277 pub args: Option<Vec<serde_json::Value>>,
278
279 #[serde(skip_serializing_if = "Option::is_none")]
281 pub kwargs: Option<HashMap<String, serde_json::Value>>,
282
283 #[serde(skip_serializing_if = "Option::is_none")]
285 pub options: Option<TaskConfig>,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
290#[serde(untagged)]
291pub enum ScheduleDefinition {
292 Crontab(String),
294
295 Interval(u64),
297
298 Complex {
300 #[serde(rename = "type")]
302 schedule_type: String,
303
304 value: serde_json::Value,
306 },
307}
308
309impl Default for CeleryConfig {
310 fn default() -> Self {
311 Self {
312 broker_url: "redis://localhost:6379/0".to_string(),
313 result_backend: Some("redis://localhost:6379/1".to_string()),
314 task_serializer: default_serializer(),
315 result_serializer: default_serializer(),
316 accept_content: default_accept_content(),
317 timezone: default_timezone(),
318 enable_utc: true,
319 task_track_started: false,
320 task_send_sent_event: false,
321 task_acks_late: false,
322 task_reject_on_worker_lost: false,
323 worker_concurrency: default_concurrency(),
324 worker_prefetch_multiplier: default_prefetch_multiplier(),
325 worker_max_tasks_per_child: None,
326 worker_max_memory_per_child: None,
327 worker_heartbeat: default_heartbeat_interval(),
328 task_default_queue: default_queue_name(),
329 task_default_exchange: default_queue_name(),
330 task_default_exchange_type: default_exchange_type(),
331 task_default_routing_key: default_queue_name(),
332 task_routes: HashMap::new(),
333 task_time_limit: None,
334 task_soft_time_limit: None,
335 task_default_retry_delay: default_retry_delay(),
336 task_max_retries: default_max_retries(),
337 result_expires: default_result_expires(),
338 result_compression: None,
339 result_compression_threshold: default_compression_threshold(),
340 task_annotations: HashMap::new(),
341 broker_transport_options: BrokerTransport::default(),
342 result_backend_transport_options: BackendTransport::default(),
343 beat_schedule: HashMap::new(),
344 custom: HashMap::new(),
345 }
346 }
347}
348
349impl CeleryConfig {
350 #[inline]
352 pub fn new(broker_url: impl Into<String>) -> Self {
353 Self {
354 broker_url: broker_url.into(),
355 ..Default::default()
356 }
357 }
358
359 #[inline]
361 #[must_use]
362 pub fn with_broker_url(mut self, url: impl Into<String>) -> Self {
363 self.broker_url = url.into();
364 self
365 }
366
367 #[inline]
369 #[must_use]
370 pub fn with_result_backend(mut self, url: impl Into<String>) -> Self {
371 self.result_backend = Some(url.into());
372 self
373 }
374
375 #[inline]
377 #[must_use]
378 pub fn with_task_serializer(mut self, serializer: impl Into<String>) -> Self {
379 self.task_serializer = serializer.into();
380 self
381 }
382
383 #[inline]
385 #[must_use]
386 pub fn with_result_serializer(mut self, serializer: impl Into<String>) -> Self {
387 self.result_serializer = serializer.into();
388 self
389 }
390
391 #[inline]
393 #[must_use]
394 pub fn with_accept_content(mut self, content: Vec<String>) -> Self {
395 self.accept_content = content;
396 self
397 }
398
399 #[inline]
401 #[must_use]
402 pub fn with_timezone(mut self, tz: impl Into<String>) -> Self {
403 self.timezone = tz.into();
404 self
405 }
406
407 #[must_use]
409 pub const fn with_enable_utc(mut self, enabled: bool) -> Self {
410 self.enable_utc = enabled;
411 self
412 }
413
414 #[must_use]
416 pub const fn with_worker_concurrency(mut self, concurrency: usize) -> Self {
417 self.worker_concurrency = concurrency;
418 self
419 }
420
421 #[must_use]
423 pub const fn with_prefetch_multiplier(mut self, multiplier: usize) -> Self {
424 self.worker_prefetch_multiplier = multiplier;
425 self
426 }
427
428 #[inline]
430 #[must_use]
431 pub fn with_default_queue(mut self, queue: impl Into<String>) -> Self {
432 self.task_default_queue = queue.into();
433 self
434 }
435
436 #[inline]
438 #[must_use]
439 pub fn with_task_route(mut self, task: impl Into<String>, route: TaskRoute) -> Self {
440 self.task_routes.insert(task.into(), route);
441 self
442 }
443
444 #[inline]
446 #[must_use]
447 pub fn with_task_annotation(mut self, task: impl Into<String>, config: TaskConfig) -> Self {
448 self.task_annotations.insert(task.into(), config);
449 self
450 }
451
452 #[must_use]
454 pub const fn with_result_expires(mut self, expires: u64) -> Self {
455 self.result_expires = expires;
456 self
457 }
458
459 #[inline]
461 #[must_use]
462 pub fn with_result_compression(mut self, algorithm: impl Into<String>) -> Self {
463 self.result_compression = Some(algorithm.into());
464 self
465 }
466
467 #[must_use]
469 pub const fn with_compression_threshold(mut self, threshold: usize) -> Self {
470 self.result_compression_threshold = threshold;
471 self
472 }
473
474 #[inline]
476 #[must_use]
477 pub fn with_beat_schedule(mut self, name: impl Into<String>, schedule: BeatSchedule) -> Self {
478 self.beat_schedule.insert(name.into(), schedule);
479 self
480 }
481
482 #[inline]
484 #[must_use]
485 pub fn get_task_config(&self, task_name: &str) -> Option<&TaskConfig> {
486 self.task_annotations.get(task_name)
487 }
488
489 #[inline]
491 #[must_use]
492 pub fn get_task_route(&self, task_name: &str) -> Option<&TaskRoute> {
493 self.task_routes.get(task_name)
494 }
495
496 #[inline]
498 #[must_use]
499 pub const fn result_expires_duration(&self) -> Duration {
500 Duration::from_secs(self.result_expires)
501 }
502
503 #[inline]
505 #[must_use]
506 pub fn task_time_limit_duration(&self) -> Option<Duration> {
507 self.task_time_limit.map(Duration::from_secs)
508 }
509
510 #[inline]
512 #[must_use]
513 pub fn task_soft_time_limit_duration(&self) -> Option<Duration> {
514 self.task_soft_time_limit.map(Duration::from_secs)
515 }
516
517 #[must_use]
522 pub fn from_env() -> Self {
523 let mut config = Self::default();
524
525 if let Ok(url) = std::env::var("CELERY_BROKER_URL") {
527 config.broker_url = url;
528 }
529 if let Ok(backend) = std::env::var("CELERY_RESULT_BACKEND") {
530 config.result_backend = Some(backend);
531 }
532 if let Ok(serializer) = std::env::var("CELERY_TASK_SERIALIZER") {
533 config.task_serializer = serializer;
534 }
535 if let Ok(serializer) = std::env::var("CELERY_RESULT_SERIALIZER") {
536 config.result_serializer = serializer;
537 }
538 if let Ok(tz) = std::env::var("CELERY_TIMEZONE") {
539 config.timezone = tz;
540 }
541 if let Ok(queue) = std::env::var("CELERY_DEFAULT_QUEUE") {
542 config.task_default_queue = queue;
543 }
544 if let Ok(exchange) = std::env::var("CELERY_DEFAULT_EXCHANGE") {
545 config.task_default_exchange = exchange;
546 }
547 if let Ok(exchange_type) = std::env::var("CELERY_DEFAULT_EXCHANGE_TYPE") {
548 config.task_default_exchange_type = exchange_type;
549 }
550 if let Ok(routing_key) = std::env::var("CELERY_DEFAULT_ROUTING_KEY") {
551 config.task_default_routing_key = routing_key;
552 }
553
554 if let Some(val) = parse_env_bool("CELERY_ENABLE_UTC") {
556 config.enable_utc = val;
557 }
558 if let Some(val) = parse_env_bool("CELERY_TASK_TRACK_STARTED") {
559 config.task_track_started = val;
560 }
561 if let Some(val) = parse_env_bool("CELERY_TASK_SEND_SENT_EVENT") {
562 config.task_send_sent_event = val;
563 }
564 if let Some(val) = parse_env_bool("CELERY_TASK_ACKS_LATE") {
565 config.task_acks_late = val;
566 }
567 if let Some(val) = parse_env_bool("CELERY_TASK_REJECT_ON_WORKER_LOST") {
568 config.task_reject_on_worker_lost = val;
569 }
570
571 if let Ok(concurrency) = std::env::var("CELERYD_CONCURRENCY") {
573 if let Ok(val) = concurrency.parse() {
574 config.worker_concurrency = val;
575 }
576 }
577 if let Some(val) = parse_env_usize("CELERYD_PREFETCH_MULTIPLIER") {
578 config.worker_prefetch_multiplier = val;
579 }
580 if let Some(val) = parse_env_usize("CELERYD_MAX_TASKS_PER_CHILD") {
581 config.worker_max_tasks_per_child = Some(val);
582 }
583 if let Some(val) = parse_env_usize("CELERYD_MAX_MEMORY_PER_CHILD") {
584 config.worker_max_memory_per_child = Some(val);
585 }
586
587 if let Some(val) = parse_env_u64("CELERY_TASK_TIME_LIMIT") {
589 config.task_time_limit = Some(val);
590 }
591 if let Some(val) = parse_env_u64("CELERY_TASK_SOFT_TIME_LIMIT") {
592 config.task_soft_time_limit = Some(val);
593 }
594 if let Some(val) = parse_env_u64("CELERY_TASK_DEFAULT_RETRY_DELAY") {
595 config.task_default_retry_delay = val;
596 }
597 if let Some(val) = parse_env_u32("CELERY_TASK_MAX_RETRIES") {
598 config.task_max_retries = val;
599 }
600 if let Some(val) = parse_env_u64("CELERY_RESULT_EXPIRES") {
601 config.result_expires = val;
602 }
603
604 config
605 }
606
607 pub fn validate_detailed(&self) -> ConfigValidation {
609 let mut validation = ConfigValidation::new();
610
611 if self.broker_url.is_empty() {
613 validation.add_error(
614 "broker_url",
615 "broker URL is required",
616 Some("set CELERY_BROKER_URL environment variable".to_string()),
617 );
618 } else if !self.broker_url.starts_with("redis://")
619 && !self.broker_url.starts_with("rediss://")
620 && !self.broker_url.starts_with("amqp://")
621 && !self.broker_url.starts_with("amqps://")
622 && !self.broker_url.starts_with("sqs://")
623 && !self.broker_url.starts_with("postgres://")
624 && !self.broker_url.starts_with("postgresql://")
625 && !self.broker_url.starts_with("mysql://")
626 {
627 validation.add_error(
628 "broker_url",
629 format!("unrecognized broker URL scheme: {}", self.broker_url),
630 Some("use redis://, amqp://, sqs://, postgres://, or mysql://".to_string()),
631 );
632 }
633
634 if let Some(ref url) = self.result_backend {
636 if !url.starts_with("redis://")
637 && !url.starts_with("rediss://")
638 && !url.starts_with("postgres://")
639 && !url.starts_with("postgresql://")
640 && !url.starts_with("mysql://")
641 && !url.starts_with("grpc://")
642 {
643 validation.add_error(
644 "result_backend",
645 format!("unrecognized result backend URL scheme: {}", url),
646 Some("use redis://, postgres://, mysql://, or grpc://".to_string()),
647 );
648 }
649 }
650
651 let valid_serializers = ["json", "msgpack", "yaml", "pickle", "bson", "protobuf"];
653 if !valid_serializers.contains(&self.task_serializer.as_str()) {
654 validation.add_error(
655 "task_serializer",
656 format!("unknown serializer: {}", self.task_serializer),
657 Some(format!("use one of: {}", valid_serializers.join(", "))),
658 );
659 }
660 if !valid_serializers.contains(&self.result_serializer.as_str()) {
661 validation.add_error(
662 "result_serializer",
663 format!("unknown serializer: {}", self.result_serializer),
664 Some(format!("use one of: {}", valid_serializers.join(", "))),
665 );
666 }
667
668 if self.worker_concurrency == 0 {
670 validation.add_error(
671 "worker_concurrency",
672 "concurrency must be at least 1",
673 Some("set CELERYD_CONCURRENCY to a positive integer".to_string()),
674 );
675 }
676 if self.worker_concurrency > 1024 {
677 validation.add_warning(
678 "worker_concurrency",
679 format!(
680 "high concurrency value ({}), may cause resource exhaustion",
681 self.worker_concurrency
682 ),
683 );
684 }
685
686 if let (Some(hard), Some(soft)) = (self.task_time_limit, self.task_soft_time_limit) {
688 if soft >= hard {
689 validation.add_warning(
690 "task_soft_time_limit",
691 "soft time limit should be less than hard time limit",
692 );
693 }
694 }
695
696 if self.worker_prefetch_multiplier == 0 {
698 validation.add_warning(
699 "worker_prefetch_multiplier",
700 "prefetch multiplier of 0 disables prefetching, consider setting to 1",
701 );
702 }
703
704 if self.task_max_retries > 100 {
706 validation.add_warning(
707 "task_max_retries",
708 format!(
709 "high max retries ({}), may cause infinite retry loops",
710 self.task_max_retries
711 ),
712 );
713 }
714
715 validation
716 }
717
718 pub fn to_env_vars(&self) -> Vec<(String, String)> {
720 let mut vars = Vec::new();
721
722 vars.push(("CELERY_BROKER_URL".to_string(), self.broker_url.clone()));
723 if let Some(ref backend) = self.result_backend {
724 vars.push(("CELERY_RESULT_BACKEND".to_string(), backend.clone()));
725 }
726 vars.push((
727 "CELERY_TASK_SERIALIZER".to_string(),
728 self.task_serializer.clone(),
729 ));
730 vars.push((
731 "CELERY_RESULT_SERIALIZER".to_string(),
732 self.result_serializer.clone(),
733 ));
734 vars.push(("CELERY_TIMEZONE".to_string(), self.timezone.clone()));
735 vars.push(("CELERY_ENABLE_UTC".to_string(), self.enable_utc.to_string()));
736 vars.push((
737 "CELERY_TASK_TRACK_STARTED".to_string(),
738 self.task_track_started.to_string(),
739 ));
740 vars.push((
741 "CELERY_TASK_SEND_SENT_EVENT".to_string(),
742 self.task_send_sent_event.to_string(),
743 ));
744 vars.push((
745 "CELERY_TASK_ACKS_LATE".to_string(),
746 self.task_acks_late.to_string(),
747 ));
748 vars.push((
749 "CELERY_TASK_REJECT_ON_WORKER_LOST".to_string(),
750 self.task_reject_on_worker_lost.to_string(),
751 ));
752 vars.push((
753 "CELERYD_CONCURRENCY".to_string(),
754 self.worker_concurrency.to_string(),
755 ));
756 vars.push((
757 "CELERYD_PREFETCH_MULTIPLIER".to_string(),
758 self.worker_prefetch_multiplier.to_string(),
759 ));
760 if let Some(val) = self.worker_max_tasks_per_child {
761 vars.push(("CELERYD_MAX_TASKS_PER_CHILD".to_string(), val.to_string()));
762 }
763 if let Some(val) = self.worker_max_memory_per_child {
764 vars.push(("CELERYD_MAX_MEMORY_PER_CHILD".to_string(), val.to_string()));
765 }
766 vars.push((
767 "CELERY_DEFAULT_QUEUE".to_string(),
768 self.task_default_queue.clone(),
769 ));
770 vars.push((
771 "CELERY_DEFAULT_EXCHANGE".to_string(),
772 self.task_default_exchange.clone(),
773 ));
774 vars.push((
775 "CELERY_DEFAULT_EXCHANGE_TYPE".to_string(),
776 self.task_default_exchange_type.clone(),
777 ));
778 vars.push((
779 "CELERY_DEFAULT_ROUTING_KEY".to_string(),
780 self.task_default_routing_key.clone(),
781 ));
782 if let Some(val) = self.task_time_limit {
783 vars.push(("CELERY_TASK_TIME_LIMIT".to_string(), val.to_string()));
784 }
785 if let Some(val) = self.task_soft_time_limit {
786 vars.push(("CELERY_TASK_SOFT_TIME_LIMIT".to_string(), val.to_string()));
787 }
788 vars.push((
789 "CELERY_TASK_DEFAULT_RETRY_DELAY".to_string(),
790 self.task_default_retry_delay.to_string(),
791 ));
792 vars.push((
793 "CELERY_TASK_MAX_RETRIES".to_string(),
794 self.task_max_retries.to_string(),
795 ));
796 vars.push((
797 "CELERY_RESULT_EXPIRES".to_string(),
798 self.result_expires.to_string(),
799 ));
800
801 vars
802 }
803
804 pub fn dump(&self) -> String {
806 let mut output = String::from("CeleRS Configuration:\n");
807 output.push_str(&format!(" broker_url: {}\n", self.broker_url));
808 output.push_str(&format!(" result_backend: {:?}\n", self.result_backend));
809 output.push_str(&format!(" task_serializer: {}\n", self.task_serializer));
810 output.push_str(&format!(
811 " result_serializer: {}\n",
812 self.result_serializer
813 ));
814 output.push_str(&format!(" timezone: {}\n", self.timezone));
815 output.push_str(&format!(" enable_utc: {}\n", self.enable_utc));
816 output.push_str(&format!(
817 " task_track_started: {}\n",
818 self.task_track_started
819 ));
820 output.push_str(&format!(
821 " task_send_sent_event: {}\n",
822 self.task_send_sent_event
823 ));
824 output.push_str(&format!(" task_acks_late: {}\n", self.task_acks_late));
825 output.push_str(&format!(
826 " task_reject_on_worker_lost: {}\n",
827 self.task_reject_on_worker_lost
828 ));
829 output.push_str(&format!(
830 " worker_concurrency: {}\n",
831 self.worker_concurrency
832 ));
833 output.push_str(&format!(
834 " worker_prefetch_multiplier: {}\n",
835 self.worker_prefetch_multiplier
836 ));
837 output.push_str(&format!(
838 " worker_max_tasks_per_child: {:?}\n",
839 self.worker_max_tasks_per_child
840 ));
841 output.push_str(&format!(
842 " worker_max_memory_per_child: {:?}\n",
843 self.worker_max_memory_per_child
844 ));
845 output.push_str(&format!(" worker_heartbeat: {}s\n", self.worker_heartbeat));
846 output.push_str(&format!(
847 " task_default_queue: {}\n",
848 self.task_default_queue
849 ));
850 output.push_str(&format!(
851 " task_default_exchange: {}\n",
852 self.task_default_exchange
853 ));
854 output.push_str(&format!(
855 " task_default_exchange_type: {}\n",
856 self.task_default_exchange_type
857 ));
858 output.push_str(&format!(
859 " task_default_routing_key: {}\n",
860 self.task_default_routing_key
861 ));
862 output.push_str(&format!(" task_time_limit: {:?}\n", self.task_time_limit));
863 output.push_str(&format!(
864 " task_soft_time_limit: {:?}\n",
865 self.task_soft_time_limit
866 ));
867 output.push_str(&format!(
868 " task_default_retry_delay: {}s\n",
869 self.task_default_retry_delay
870 ));
871 output.push_str(&format!(" task_max_retries: {}\n", self.task_max_retries));
872 output.push_str(&format!(" result_expires: {}s\n", self.result_expires));
873 output.push_str(&format!(
874 " result_compression: {:?}\n",
875 self.result_compression
876 ));
877 output.push_str(&format!(
878 " result_compression_threshold: {} bytes\n",
879 self.result_compression_threshold
880 ));
881 output.push_str(&format!(
882 " task_routes: {} route(s)\n",
883 self.task_routes.len()
884 ));
885 output.push_str(&format!(
886 " task_annotations: {} annotation(s)\n",
887 self.task_annotations.len()
888 ));
889 output.push_str(&format!(
890 " beat_schedule: {} schedule(s)\n",
891 self.beat_schedule.len()
892 ));
893 output
894 }
895
896 pub fn validate(&self) -> Result<(), String> {
902 if self.broker_url.is_empty() {
903 return Err("broker_url is required".to_string());
904 }
905
906 if self.worker_concurrency == 0 {
907 return Err("worker_concurrency must be greater than 0".to_string());
908 }
909
910 if !["json", "msgpack", "yaml", "pickle"].contains(&self.task_serializer.as_str()) {
911 return Err(format!(
912 "Unsupported task_serializer: {}",
913 self.task_serializer
914 ));
915 }
916
917 Ok(())
918 }
919}
920
921fn default_serializer() -> String {
923 "json".to_string()
924}
925
926fn default_accept_content() -> Vec<String> {
927 vec!["json".to_string(), "msgpack".to_string()]
928}
929
930fn default_timezone() -> String {
931 "UTC".to_string()
932}
933
934fn default_true() -> bool {
935 true
936}
937
938fn default_concurrency() -> usize {
939 num_cpus::get()
940}
941
942fn default_prefetch_multiplier() -> usize {
943 4
944}
945
946fn default_heartbeat_interval() -> u64 {
947 10
948}
949
950fn default_queue_name() -> String {
951 "celery".to_string()
952}
953
954fn default_exchange_type() -> String {
955 "direct".to_string()
956}
957
958fn default_retry_delay() -> u64 {
959 180 }
961
962fn default_max_retries() -> u32 {
963 3
964}
965
966fn default_result_expires() -> u64 {
967 86400 }
969
970fn default_compression_threshold() -> usize {
971 1024 * 1024 }
973
974fn parse_env_bool(var: &str) -> Option<bool> {
980 std::env::var(var)
981 .ok()
982 .and_then(|v| match v.to_lowercase().as_str() {
983 "true" | "1" | "yes" | "on" => Some(true),
984 "false" | "0" | "no" | "off" => Some(false),
985 _ => None,
986 })
987}
988
989fn parse_env_u64(var: &str) -> Option<u64> {
990 std::env::var(var).ok().and_then(|v| v.parse().ok())
991}
992
993fn parse_env_u32(var: &str) -> Option<u32> {
994 std::env::var(var).ok().and_then(|v| v.parse().ok())
995}
996
997fn parse_env_usize(var: &str) -> Option<usize> {
998 std::env::var(var).ok().and_then(|v| v.parse().ok())
999}
1000
1001#[derive(Debug, Clone, Default)]
1005pub struct ConfigValidation {
1006 pub errors: Vec<ConfigError>,
1008 pub warnings: Vec<ConfigWarning>,
1010}
1011
1012impl ConfigValidation {
1013 pub fn new() -> Self {
1015 Self::default()
1016 }
1017
1018 pub fn is_valid(&self) -> bool {
1020 self.errors.is_empty()
1021 }
1022
1023 pub fn has_warnings(&self) -> bool {
1025 !self.warnings.is_empty()
1026 }
1027
1028 pub fn error_count(&self) -> usize {
1030 self.errors.len()
1031 }
1032
1033 pub fn warning_count(&self) -> usize {
1035 self.warnings.len()
1036 }
1037
1038 fn add_error(
1039 &mut self,
1040 field: impl Into<String>,
1041 message: impl Into<String>,
1042 suggestion: Option<String>,
1043 ) {
1044 self.errors.push(ConfigError {
1045 field: field.into(),
1046 message: message.into(),
1047 suggestion,
1048 });
1049 }
1050
1051 fn add_warning(&mut self, field: impl Into<String>, message: impl Into<String>) {
1052 self.warnings.push(ConfigWarning {
1053 field: field.into(),
1054 message: message.into(),
1055 });
1056 }
1057}
1058
1059#[derive(Debug, Clone)]
1061pub struct ConfigError {
1062 pub field: String,
1064 pub message: String,
1066 pub suggestion: Option<String>,
1068}
1069
1070impl std::fmt::Display for ConfigError {
1071 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1072 write!(f, "[{}] {}", self.field, self.message)?;
1073 if let Some(ref suggestion) = self.suggestion {
1074 write!(f, " (suggestion: {})", suggestion)?;
1075 }
1076 Ok(())
1077 }
1078}
1079
1080#[derive(Debug, Clone)]
1082pub struct ConfigWarning {
1083 pub field: String,
1085 pub message: String,
1087}
1088
1089impl std::fmt::Display for ConfigWarning {
1090 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1091 write!(f, "[{}] {}", self.field, self.message)
1092 }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097 use super::*;
1098 use std::sync::Mutex;
1099
1100 static ENV_LOCK: Mutex<()> = Mutex::new(());
1102
1103 fn set_env(key: &str, val: &str) {
1106 unsafe { std::env::set_var(key, val) };
1107 }
1108
1109 fn remove_env(key: &str) {
1111 unsafe { std::env::remove_var(key) };
1112 }
1113
1114 const ALL_ENV_KEYS: &[&str] = &[
1116 "CELERY_BROKER_URL",
1117 "CELERY_RESULT_BACKEND",
1118 "CELERY_TASK_SERIALIZER",
1119 "CELERY_RESULT_SERIALIZER",
1120 "CELERY_TIMEZONE",
1121 "CELERY_DEFAULT_QUEUE",
1122 "CELERY_DEFAULT_EXCHANGE",
1123 "CELERY_DEFAULT_EXCHANGE_TYPE",
1124 "CELERY_DEFAULT_ROUTING_KEY",
1125 "CELERY_ENABLE_UTC",
1126 "CELERY_TASK_TRACK_STARTED",
1127 "CELERY_TASK_SEND_SENT_EVENT",
1128 "CELERY_TASK_ACKS_LATE",
1129 "CELERY_TASK_REJECT_ON_WORKER_LOST",
1130 "CELERYD_CONCURRENCY",
1131 "CELERYD_PREFETCH_MULTIPLIER",
1132 "CELERYD_MAX_TASKS_PER_CHILD",
1133 "CELERYD_MAX_MEMORY_PER_CHILD",
1134 "CELERY_TASK_TIME_LIMIT",
1135 "CELERY_TASK_SOFT_TIME_LIMIT",
1136 "CELERY_TASK_DEFAULT_RETRY_DELAY",
1137 "CELERY_TASK_MAX_RETRIES",
1138 "CELERY_RESULT_EXPIRES",
1139 ];
1140
1141 fn cleanup_env() {
1142 for key in ALL_ENV_KEYS {
1143 remove_env(key);
1144 }
1145 }
1146
1147 #[test]
1148 fn test_default_config() {
1149 let config = CeleryConfig::default();
1150 assert_eq!(config.broker_url, "redis://localhost:6379/0");
1151 assert_eq!(config.task_serializer, "json");
1152 assert_eq!(config.timezone, "UTC");
1153 assert!(config.enable_utc);
1154 }
1155
1156 #[test]
1157 fn test_config_builder() {
1158 let config = CeleryConfig::new("redis://localhost:6379/0")
1159 .with_result_backend("redis://localhost:6379/1")
1160 .with_worker_concurrency(8)
1161 .with_default_queue("my_queue");
1162
1163 assert_eq!(config.worker_concurrency, 8);
1164 assert_eq!(config.task_default_queue, "my_queue");
1165 }
1166
1167 #[test]
1168 fn test_config_validation() {
1169 let config = CeleryConfig::default();
1170 assert!(config.validate().is_ok());
1171
1172 let invalid = CeleryConfig {
1173 broker_url: String::new(),
1174 ..Default::default()
1175 };
1176 assert!(invalid.validate().is_err());
1177 }
1178
1179 #[test]
1180 fn test_task_route() {
1181 let route = TaskRoute {
1182 queue: "high_priority".to_string(),
1183 exchange: Some("tasks".to_string()),
1184 routing_key: Some("task.high".to_string()),
1185 priority: Some(9),
1186 };
1187
1188 let config = CeleryConfig::default().with_task_route("important_task", route);
1189
1190 assert!(config.get_task_route("important_task").is_some());
1191 }
1192
1193 #[test]
1194 fn test_duration_conversions() {
1195 let config = CeleryConfig::default();
1196 assert_eq!(config.result_expires_duration(), Duration::from_secs(86400));
1197 }
1198
1199 #[test]
1200 fn test_from_env_boolean_vars() {
1201 let _guard = ENV_LOCK.lock();
1202 cleanup_env();
1203
1204 set_env("CELERY_ENABLE_UTC", "true");
1205 set_env("CELERY_TASK_TRACK_STARTED", "1");
1206 set_env("CELERY_TASK_SEND_SENT_EVENT", "yes");
1207 set_env("CELERY_TASK_ACKS_LATE", "on");
1208 set_env("CELERY_TASK_REJECT_ON_WORKER_LOST", "false");
1209
1210 let config = CeleryConfig::from_env();
1211 assert!(config.enable_utc);
1212 assert!(config.task_track_started);
1213 assert!(config.task_send_sent_event);
1214 assert!(config.task_acks_late);
1215 assert!(!config.task_reject_on_worker_lost);
1216
1217 cleanup_env();
1218 }
1219
1220 #[test]
1221 fn test_from_env_numeric_vars() {
1222 let _guard = ENV_LOCK.lock();
1223 cleanup_env();
1224
1225 set_env("CELERYD_PREFETCH_MULTIPLIER", "8");
1226 set_env("CELERYD_CONCURRENCY", "16");
1227 set_env("CELERYD_MAX_TASKS_PER_CHILD", "1000");
1228 set_env("CELERYD_MAX_MEMORY_PER_CHILD", "524288");
1229
1230 let config = CeleryConfig::from_env();
1231 assert_eq!(config.worker_prefetch_multiplier, 8);
1232 assert_eq!(config.worker_concurrency, 16);
1233 assert_eq!(config.worker_max_tasks_per_child, Some(1000));
1234 assert_eq!(config.worker_max_memory_per_child, Some(524288));
1235
1236 cleanup_env();
1237 }
1238
1239 #[test]
1240 fn test_from_env_string_vars() {
1241 let _guard = ENV_LOCK.lock();
1242 cleanup_env();
1243
1244 set_env("CELERY_DEFAULT_QUEUE", "myqueue");
1245 set_env("CELERY_DEFAULT_EXCHANGE", "myexchange");
1246 set_env("CELERY_DEFAULT_EXCHANGE_TYPE", "topic");
1247 set_env("CELERY_DEFAULT_ROUTING_KEY", "task.default");
1248 set_env("CELERY_RESULT_SERIALIZER", "msgpack");
1249
1250 let config = CeleryConfig::from_env();
1251 assert_eq!(config.task_default_queue, "myqueue");
1252 assert_eq!(config.task_default_exchange, "myexchange");
1253 assert_eq!(config.task_default_exchange_type, "topic");
1254 assert_eq!(config.task_default_routing_key, "task.default");
1255 assert_eq!(config.result_serializer, "msgpack");
1256
1257 cleanup_env();
1258 }
1259
1260 #[test]
1261 fn test_from_env_duration_vars() {
1262 let _guard = ENV_LOCK.lock();
1263 cleanup_env();
1264
1265 set_env("CELERY_TASK_TIME_LIMIT", "300");
1266 set_env("CELERY_TASK_SOFT_TIME_LIMIT", "240");
1267 set_env("CELERY_TASK_DEFAULT_RETRY_DELAY", "60");
1268 set_env("CELERY_TASK_MAX_RETRIES", "5");
1269 set_env("CELERY_RESULT_EXPIRES", "3600");
1270
1271 let config = CeleryConfig::from_env();
1272 assert_eq!(config.task_time_limit, Some(300));
1273 assert_eq!(config.task_soft_time_limit, Some(240));
1274 assert_eq!(config.task_default_retry_delay, 60);
1275 assert_eq!(config.task_max_retries, 5);
1276 assert_eq!(config.result_expires, 3600);
1277
1278 cleanup_env();
1279 }
1280
1281 #[test]
1282 fn test_parse_env_bool_variants() {
1283 let _guard = ENV_LOCK.lock();
1284 cleanup_env();
1285
1286 for val in &["true", "TRUE", "True", "1", "yes", "YES", "on", "ON"] {
1288 set_env("CELERY_ENABLE_UTC", val);
1289 assert_eq!(
1290 parse_env_bool("CELERY_ENABLE_UTC"),
1291 Some(true),
1292 "failed for {}",
1293 val
1294 );
1295 }
1296
1297 for val in &["false", "FALSE", "False", "0", "no", "NO", "off", "OFF"] {
1299 set_env("CELERY_ENABLE_UTC", val);
1300 assert_eq!(
1301 parse_env_bool("CELERY_ENABLE_UTC"),
1302 Some(false),
1303 "failed for {}",
1304 val
1305 );
1306 }
1307
1308 set_env("CELERY_ENABLE_UTC", "maybe");
1310 assert_eq!(parse_env_bool("CELERY_ENABLE_UTC"), None);
1311
1312 remove_env("CELERY_ENABLE_UTC");
1314 assert_eq!(parse_env_bool("CELERY_ENABLE_UTC"), None);
1315
1316 cleanup_env();
1317 }
1318
1319 #[test]
1320 fn test_validate_detailed_valid_config() {
1321 let config = CeleryConfig::default();
1322 let validation = config.validate_detailed();
1323 assert!(validation.is_valid());
1324 assert_eq!(validation.error_count(), 0);
1325 }
1326
1327 #[test]
1328 fn test_validate_detailed_invalid_broker_url() {
1329 let config = CeleryConfig {
1330 broker_url: "ftp://bad-scheme".to_string(),
1331 ..Default::default()
1332 };
1333 let validation = config.validate_detailed();
1334 assert!(!validation.is_valid());
1335 assert!(validation.errors.iter().any(|e| e.field == "broker_url"));
1336 }
1337
1338 #[test]
1339 fn test_validate_detailed_invalid_serializer() {
1340 let config = CeleryConfig {
1341 task_serializer: "xml".to_string(),
1342 ..Default::default()
1343 };
1344 let validation = config.validate_detailed();
1345 assert!(!validation.is_valid());
1346 assert!(validation
1347 .errors
1348 .iter()
1349 .any(|e| e.field == "task_serializer"));
1350 }
1351
1352 #[test]
1353 fn test_validate_detailed_zero_concurrency() {
1354 let config = CeleryConfig {
1355 worker_concurrency: 0,
1356 ..Default::default()
1357 };
1358 let validation = config.validate_detailed();
1359 assert!(!validation.is_valid());
1360 assert!(validation
1361 .errors
1362 .iter()
1363 .any(|e| e.field == "worker_concurrency"));
1364 }
1365
1366 #[test]
1367 fn test_validate_detailed_time_limit_warning() {
1368 let config = CeleryConfig {
1369 task_time_limit: Some(60),
1370 task_soft_time_limit: Some(120), ..Default::default()
1372 };
1373 let validation = config.validate_detailed();
1374 assert!(validation.has_warnings());
1375 assert!(validation
1376 .warnings
1377 .iter()
1378 .any(|w| w.field == "task_soft_time_limit"));
1379 }
1380
1381 #[test]
1382 fn test_to_env_vars_roundtrip() {
1383 let config = CeleryConfig::new("amqp://localhost:5672")
1384 .with_result_backend("redis://localhost:6379/1")
1385 .with_task_serializer("msgpack")
1386 .with_result_serializer("json")
1387 .with_timezone("US/Eastern")
1388 .with_enable_utc(false)
1389 .with_worker_concurrency(12)
1390 .with_prefetch_multiplier(2)
1391 .with_default_queue("tasks");
1392
1393 let vars = config.to_env_vars();
1394
1395 let find_var = |key: &str| -> Option<String> {
1397 vars.iter().find(|(k, _)| k == key).map(|(_, v)| v.clone())
1398 };
1399
1400 assert_eq!(
1401 find_var("CELERY_BROKER_URL").as_deref(),
1402 Some("amqp://localhost:5672")
1403 );
1404 assert_eq!(
1405 find_var("CELERY_RESULT_BACKEND").as_deref(),
1406 Some("redis://localhost:6379/1")
1407 );
1408 assert_eq!(
1409 find_var("CELERY_TASK_SERIALIZER").as_deref(),
1410 Some("msgpack")
1411 );
1412 assert_eq!(
1413 find_var("CELERY_RESULT_SERIALIZER").as_deref(),
1414 Some("json")
1415 );
1416 assert_eq!(find_var("CELERY_TIMEZONE").as_deref(), Some("US/Eastern"));
1417 assert_eq!(find_var("CELERY_ENABLE_UTC").as_deref(), Some("false"));
1418 assert_eq!(find_var("CELERYD_CONCURRENCY").as_deref(), Some("12"));
1419 assert_eq!(
1420 find_var("CELERYD_PREFETCH_MULTIPLIER").as_deref(),
1421 Some("2")
1422 );
1423 assert_eq!(find_var("CELERY_DEFAULT_QUEUE").as_deref(), Some("tasks"));
1424 }
1425
1426 #[test]
1427 fn test_dump_output() {
1428 let config = CeleryConfig::default();
1429 let output = config.dump();
1430
1431 assert!(output.starts_with("CeleRS Configuration:\n"));
1432 assert!(output.contains("broker_url:"));
1433 assert!(output.contains("task_serializer:"));
1434 assert!(output.contains("worker_concurrency:"));
1435 assert!(output.contains("result_expires:"));
1436 assert!(output.contains("task_routes:"));
1437 assert!(output.contains("beat_schedule:"));
1438 }
1439
1440 #[test]
1441 fn test_config_validation_display() {
1442 let error = ConfigError {
1443 field: "broker_url".to_string(),
1444 message: "invalid URL".to_string(),
1445 suggestion: Some("use redis://".to_string()),
1446 };
1447 let display = format!("{}", error);
1448 assert!(display.contains("[broker_url]"));
1449 assert!(display.contains("invalid URL"));
1450 assert!(display.contains("suggestion: use redis://"));
1451
1452 let error_no_suggestion = ConfigError {
1453 field: "concurrency".to_string(),
1454 message: "must be positive".to_string(),
1455 suggestion: None,
1456 };
1457 let display2 = format!("{}", error_no_suggestion);
1458 assert!(display2.contains("[concurrency]"));
1459 assert!(display2.contains("must be positive"));
1460 assert!(!display2.contains("suggestion"));
1461
1462 let warning = ConfigWarning {
1463 field: "prefetch".to_string(),
1464 message: "value too high".to_string(),
1465 };
1466 let display3 = format!("{}", warning);
1467 assert!(display3.contains("[prefetch]"));
1468 assert!(display3.contains("value too high"));
1469 }
1470
1471 #[test]
1472 fn test_validate_detailed_high_concurrency_warning() {
1473 let config = CeleryConfig {
1474 worker_concurrency: 2048,
1475 ..Default::default()
1476 };
1477 let validation = config.validate_detailed();
1478 assert!(validation.has_warnings());
1479 assert!(validation
1480 .warnings
1481 .iter()
1482 .any(|w| w.field == "worker_concurrency"));
1483 }
1484
1485 #[test]
1486 fn test_validate_detailed_prefetch_zero_warning() {
1487 let config = CeleryConfig {
1488 worker_prefetch_multiplier: 0,
1489 ..Default::default()
1490 };
1491 let validation = config.validate_detailed();
1492 assert!(validation.has_warnings());
1493 assert!(validation
1494 .warnings
1495 .iter()
1496 .any(|w| w.field == "worker_prefetch_multiplier"));
1497 }
1498
1499 #[test]
1500 fn test_validate_detailed_high_retries_warning() {
1501 let config = CeleryConfig {
1502 task_max_retries: 200,
1503 ..Default::default()
1504 };
1505 let validation = config.validate_detailed();
1506 assert!(validation.has_warnings());
1507 assert!(validation
1508 .warnings
1509 .iter()
1510 .any(|w| w.field == "task_max_retries"));
1511 }
1512
1513 #[test]
1514 fn test_validate_detailed_invalid_result_backend() {
1515 let config = CeleryConfig {
1516 result_backend: Some("ftp://invalid".to_string()),
1517 ..Default::default()
1518 };
1519 let validation = config.validate_detailed();
1520 assert!(!validation.is_valid());
1521 assert!(validation
1522 .errors
1523 .iter()
1524 .any(|e| e.field == "result_backend"));
1525 }
1526
1527 #[test]
1528 fn test_validate_detailed_invalid_result_serializer() {
1529 let config = CeleryConfig {
1530 result_serializer: "xml".to_string(),
1531 ..Default::default()
1532 };
1533 let validation = config.validate_detailed();
1534 assert!(!validation.is_valid());
1535 assert!(validation
1536 .errors
1537 .iter()
1538 .any(|e| e.field == "result_serializer"));
1539 }
1540
1541 #[test]
1542 fn test_config_validation_counts() {
1543 let mut validation = ConfigValidation::new();
1544 assert!(validation.is_valid());
1545 assert!(!validation.has_warnings());
1546 assert_eq!(validation.error_count(), 0);
1547 assert_eq!(validation.warning_count(), 0);
1548
1549 validation.add_error("f1", "e1", None);
1550 validation.add_warning("f2", "w1");
1551 assert!(!validation.is_valid());
1552 assert!(validation.has_warnings());
1553 assert_eq!(validation.error_count(), 1);
1554 assert_eq!(validation.warning_count(), 1);
1555 }
1556}