celery_rs_core/config/
config.rs

1/// The configuration object
2/// Author: Andrew Evans
3
4use amiquip::ExchangeType;
5use num_cpus;
6use std::collections::HashMap;
7use crate::amqp::amqp::AMQPConnectionInf;
8use crate::backend::config::BackendConfig;
9
10
11#[derive(Clone, Debug)]
12pub enum QueuePersistenceType{
13    PERSISTENT,
14    NONPERSISTENT
15}
16
17
18#[derive(Clone, Debug)]
19pub enum BackendType{
20    RPC,
21    REDIS
22}
23
24
25#[derive(Clone, Debug)]
26pub enum BrokerType{
27    RABBITMQ,
28}
29
30
31#[derive(Clone, Debug)]
32pub struct Admin{
33    name: String,
34    email: String,
35}
36
37
38#[derive(Clone, Debug)]
39pub struct CeleryConfig{
40    pub connection_inf: AMQPConnectionInf,
41    pub broker_connection_retry: bool,
42    pub result_backend: BackendConfig,
43    pub celery_cache_backend: Option<String>,
44    pub send_events: bool,
45    pub queues: Vec<String>,
46    pub default_exchange_type: ExchangeType,
47    pub default_queue: String,
48    pub broadcast_exchange: String,
49    pub broadcast_exchange_type: ExchangeType,
50    pub event_queue: String,
51    pub event_exchange: String,
52    pub event_exchange_type: ExchangeType,
53    pub event_routing_key: String,
54    pub event_serializer: String,
55    pub result_exchange: String,
56    pub accept_content: String,
57    pub worker_prefetch_multiplier: i8,
58    pub default_delivery_mode: QueuePersistenceType,
59    pub default_routing_key: String,
60    pub broker_connection_timeout: i64,
61    pub broker_connection_max_retries: i64,
62    pub broadcast_queue: String,
63    pub backend_type: BackendType,
64    pub broker_type: BrokerType,
65    pub celery_send_task_error_emails: bool,
66    pub admins: Vec<Admin>,
67    pub server_email: String,
68    pub mail_host: String,
69    pub mail_host_user: Option<String>,
70    pub mail_host_password: Option<String>,
71    pub mail_port: i8,
72    pub always_eager: bool,
73    pub eager_propogates_exceptions: bool,
74    pub track_started: bool,
75    pub acks_late: bool,
76    pub store_errors_even_if_ignored: bool,
77    pub task_result_expires: i64,
78    pub ignore_result: bool,
79    pub max_cached_results: i32,
80    pub result_persistent: QueuePersistenceType,
81    pub result_serializer: String,
82    pub database_engine_options: Option<HashMap<String, String>>,
83    pub default_rate_limit: i8,
84    pub disable_rate_limits: bool,
85    pub celerybeat_log_level: String,
86    pub celerybeat_log_file: Option<String>,
87    pub celerybeat_schedule_file_name: String,
88    pub celerybeat_max_loop_interval: i64,
89    pub celerymon_log_level: String,
90    pub celerymon_log_file: Option<String>,
91    pub num_connections: usize,
92}
93
94impl CeleryConfig{
95
96    pub fn new(broker_inf: AMQPConnectionInf, backend: BackendConfig) -> CeleryConfig{
97        CeleryConfig{
98            connection_inf: broker_inf,
99            broker_connection_retry: true,
100            result_backend: backend,
101            celery_cache_backend: None,
102            send_events: false,
103            queues: Vec::<String>::new(),
104            default_exchange_type: ExchangeType::Direct,
105            default_queue: String::from("celery"),
106            broadcast_exchange: String::from("celeryctl"),
107            broadcast_exchange_type: ExchangeType::Fanout,
108            event_queue: String::from("celeryevent"),
109            event_exchange: String::from("celery_event"),
110            event_exchange_type: ExchangeType::Topic,
111            event_routing_key: String::from("celeryevent"),
112            event_serializer: String::from("json"),
113            result_exchange: String::from("celeryresult"),
114            accept_content: String::from("application/json"),
115            worker_prefetch_multiplier: 4,
116            default_delivery_mode: QueuePersistenceType::PERSISTENT,
117            default_routing_key: String::from("celery"),
118            broker_connection_timeout: 10000,
119            broker_connection_max_retries: 1000,
120            broadcast_queue: String::from("celeryctl"),
121            backend_type: BackendType::RPC,
122            broker_type: BrokerType::RABBITMQ,
123            celery_send_task_error_emails: false,
124            admins: Vec::<Admin>::new(),
125            server_email: String::from("celery@localhost"),
126            mail_host: String::from("localhost"),
127            mail_host_user: None,
128            mail_host_password: None,
129            mail_port: 25,
130            always_eager: false,
131            eager_propogates_exceptions: true,
132            track_started: false,
133            acks_late: true,
134            store_errors_even_if_ignored: false,
135            task_result_expires: 600000,
136            ignore_result: false,
137            max_cached_results: 100,
138            result_persistent: QueuePersistenceType::NONPERSISTENT,
139            result_serializer: String::from("json"),
140            database_engine_options: None,
141            default_rate_limit: -1,
142            disable_rate_limits: true,
143            celerybeat_log_level: String::from("INFO"),
144            celerybeat_log_file: None,
145            celerybeat_schedule_file_name: String::from("celerybeat-schedule"),
146            celerybeat_max_loop_interval: 300000,
147            celerymon_log_level: String::from("INFO"),
148            celerymon_log_file: None,
149            num_connections: num_cpus::get(),
150        }
151    }
152}
153
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    #[test]
160    fn should_create_a_configuration(){
161        let broker_conf = AMQPConnectionInf::new(
162            String::from("amqp"),
163            String::from("127.0.0.1"),
164            5672,
165            Some(String::from("test")),
166            Some(String::from("dev")),
167            Some(String::from("rtp*4500")),
168            false
169        );
170        let b = BackendConfig{
171            url: "fake".to_string(),
172            username: None,
173            password: None,
174            transport_options: None,
175        };
176        let c = CeleryConfig::new(broker_conf, b);
177        let url = c.connection_inf.to_url();
178        assert!(url.eq("amqp://dev:rtp*4500@127.0.0.1:5672/test"))
179    }
180}