1use amiquip::ExchangeType;
5use num_cpus;
6use std::collections::HashMap;
7use crate::amqp::amqp::AMQPConnectionInf;
8use crate::backend::config::BackendConfig;
9
10
11#[derive(Clone, Debug)]
12pub enum QueuePersistenceType{
13 PERSISTENT,
14 NONPERSISTENT
15}
16
17
18#[derive(Clone, Debug)]
19pub enum BackendType{
20 RPC,
21 REDIS
22}
23
24
25#[derive(Clone, Debug)]
26pub enum BrokerType{
27 RABBITMQ,
28}
29
30
31#[derive(Clone, Debug)]
32pub struct Admin{
33 name: String,
34 email: String,
35}
36
37
38#[derive(Clone, Debug)]
39pub struct CeleryConfig{
40 pub connection_inf: AMQPConnectionInf,
41 pub broker_connection_retry: bool,
42 pub result_backend: BackendConfig,
43 pub celery_cache_backend: Option<String>,
44 pub send_events: bool,
45 pub queues: Vec<String>,
46 pub default_exchange_type: ExchangeType,
47 pub default_queue: String,
48 pub broadcast_exchange: String,
49 pub broadcast_exchange_type: ExchangeType,
50 pub event_queue: String,
51 pub event_exchange: String,
52 pub event_exchange_type: ExchangeType,
53 pub event_routing_key: String,
54 pub event_serializer: String,
55 pub result_exchange: String,
56 pub accept_content: String,
57 pub worker_prefetch_multiplier: i8,
58 pub default_delivery_mode: QueuePersistenceType,
59 pub default_routing_key: String,
60 pub broker_connection_timeout: i64,
61 pub broker_connection_max_retries: i64,
62 pub broadcast_queue: String,
63 pub backend_type: BackendType,
64 pub broker_type: BrokerType,
65 pub celery_send_task_error_emails: bool,
66 pub admins: Vec<Admin>,
67 pub server_email: String,
68 pub mail_host: String,
69 pub mail_host_user: Option<String>,
70 pub mail_host_password: Option<String>,
71 pub mail_port: i8,
72 pub always_eager: bool,
73 pub eager_propogates_exceptions: bool,
74 pub track_started: bool,
75 pub acks_late: bool,
76 pub store_errors_even_if_ignored: bool,
77 pub task_result_expires: i64,
78 pub ignore_result: bool,
79 pub max_cached_results: i32,
80 pub result_persistent: QueuePersistenceType,
81 pub result_serializer: String,
82 pub database_engine_options: Option<HashMap<String, String>>,
83 pub default_rate_limit: i8,
84 pub disable_rate_limits: bool,
85 pub celerybeat_log_level: String,
86 pub celerybeat_log_file: Option<String>,
87 pub celerybeat_schedule_file_name: String,
88 pub celerybeat_max_loop_interval: i64,
89 pub celerymon_log_level: String,
90 pub celerymon_log_file: Option<String>,
91 pub num_connections: usize,
92}
93
94impl CeleryConfig{
95
96 pub fn new(broker_inf: AMQPConnectionInf, backend: BackendConfig) -> CeleryConfig{
97 CeleryConfig{
98 connection_inf: broker_inf,
99 broker_connection_retry: true,
100 result_backend: backend,
101 celery_cache_backend: None,
102 send_events: false,
103 queues: Vec::<String>::new(),
104 default_exchange_type: ExchangeType::Direct,
105 default_queue: String::from("celery"),
106 broadcast_exchange: String::from("celeryctl"),
107 broadcast_exchange_type: ExchangeType::Fanout,
108 event_queue: String::from("celeryevent"),
109 event_exchange: String::from("celery_event"),
110 event_exchange_type: ExchangeType::Topic,
111 event_routing_key: String::from("celeryevent"),
112 event_serializer: String::from("json"),
113 result_exchange: String::from("celeryresult"),
114 accept_content: String::from("application/json"),
115 worker_prefetch_multiplier: 4,
116 default_delivery_mode: QueuePersistenceType::PERSISTENT,
117 default_routing_key: String::from("celery"),
118 broker_connection_timeout: 10000,
119 broker_connection_max_retries: 1000,
120 broadcast_queue: String::from("celeryctl"),
121 backend_type: BackendType::RPC,
122 broker_type: BrokerType::RABBITMQ,
123 celery_send_task_error_emails: false,
124 admins: Vec::<Admin>::new(),
125 server_email: String::from("celery@localhost"),
126 mail_host: String::from("localhost"),
127 mail_host_user: None,
128 mail_host_password: None,
129 mail_port: 25,
130 always_eager: false,
131 eager_propogates_exceptions: true,
132 track_started: false,
133 acks_late: true,
134 store_errors_even_if_ignored: false,
135 task_result_expires: 600000,
136 ignore_result: false,
137 max_cached_results: 100,
138 result_persistent: QueuePersistenceType::NONPERSISTENT,
139 result_serializer: String::from("json"),
140 database_engine_options: None,
141 default_rate_limit: -1,
142 disable_rate_limits: true,
143 celerybeat_log_level: String::from("INFO"),
144 celerybeat_log_file: None,
145 celerybeat_schedule_file_name: String::from("celerybeat-schedule"),
146 celerybeat_max_loop_interval: 300000,
147 celerymon_log_level: String::from("INFO"),
148 celerymon_log_file: None,
149 num_connections: num_cpus::get(),
150 }
151 }
152}
153
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 #[test]
160 fn should_create_a_configuration(){
161 let broker_conf = AMQPConnectionInf::new(
162 String::from("amqp"),
163 String::from("127.0.0.1"),
164 5672,
165 Some(String::from("test")),
166 Some(String::from("dev")),
167 Some(String::from("rtp*4500")),
168 false
169 );
170 let b = BackendConfig{
171 url: "fake".to_string(),
172 username: None,
173 password: None,
174 transport_options: None,
175 };
176 let c = CeleryConfig::new(broker_conf, b);
177 let url = c.connection_inf.to_url();
178 assert!(url.eq("amqp://dev:rtp*4500@127.0.0.1:5672/test"))
179 }
180}