celery_rs_core/broker/
broker.rs

1/*
2Implementation of available brokers in a non-asynchronous manner.
3
4Author Andrew Evans
5*/
6
7use std::any::Any;
8use std::collections::BTreeMap;
9use std::env::Args;
10use std::error::Error;
11use std::iter::Map;
12
13use amiquip::{AmqpProperties, AmqpValue, Channel, Exchange, ExchangeDeclareOptions, ExchangeType, FieldTable, Publish, Queue, QueueDeclareOptions};
14use serde_json::{to_string, Value};
15use serde_json::map::Values;
16
17use crate::amqp::{exchange_error::ExchangeError, publish_error::PublishError, queue_error::QueueError};
18use crate::amqp::amqp::AMQPConnectionInf;
19use crate::argparse::kwargs::KwArgs;
20use crate::config::config::CeleryConfig;
21use crate::connection::rabbitmq_connection_pool::ThreadableRabbitMQConnectionPool;
22use crate::connection::threadable_rabbit_mq_connection::ThreadableRabbitMQConnection;
23use crate::message_protocol::{headers::Headers, message::Message, message_body::MessageBody, properties::Properties};
24use crate::task::config::TaskConfig;
25
26/// RabbitMQ Broker
27pub struct RabbitMQBroker{
28    config: CeleryConfig,
29}
30
31
32/// AMQP Broker
33pub trait AMQPBroker{
34
35    /// bind queue to the exchange
36    fn bind_to_exchange(config: CeleryConfig, channel: &Channel, exchange: String, queue: String, routing_key: String) -> Result<bool, ExchangeError>;
37
38    /// create a queue
39    fn create_queue(config: CeleryConfig, channel: &Channel, durable: bool, queue: String, declare_exchange: bool, uuid: String, exchange: Option<String>, routing_key: Option<String>) -> Result<bool, QueueError>;
40
41    /// create an exchange
42    fn create_exchange(config: CeleryConfig, channel: &Channel, durable: bool, exchange: String, exchange_type: ExchangeType) -> Result<bool, ExchangeError>;
43
44    /// send task to the broker
45    fn send_task(config: CeleryConfig, channel: &Channel, props: Properties, headers: Headers, body: MessageBody, exchange: Option<String>, routing_key: Option<String>) -> Result<bool, PublishError>;
46}
47
48
49/// AMQP Broker
50impl AMQPBroker for RabbitMQBroker{
51
52    /// create the exchange
53    fn create_exchange(config: CeleryConfig, channel: &Channel, durable: bool, exchange: String, exchange_type: ExchangeType) -> Result<bool, ExchangeError> {
54        let mut opts = ExchangeDeclareOptions::default();
55        opts.durable = durable;
56        let r = channel.exchange_declare(exchange_type, exchange, opts);
57        if r.is_ok(){
58            Ok(true)
59        }else{
60            Err(ExchangeError)
61        }
62    }
63
64    /// create a queue
65    fn create_queue(config: CeleryConfig, channel: &Channel, durable: bool, queue: String, declare_exchange: bool, uuid: String, exchange: Option<String>, routing_key: Option<String>) -> Result<bool, QueueError>{
66        let mut qopts = QueueDeclareOptions::default();
67        if declare_exchange{
68            let mut etype = ExchangeType::Direct;
69            let mut eopts= ExchangeDeclareOptions::default();
70            eopts.durable = durable;
71            channel.exchange_declare(etype, exchange.clone().unwrap(), eopts);
72        }
73        if durable {
74            qopts.durable = durable;
75        }
76        let r = channel.queue_declare(queue.clone(), qopts);
77        if r.is_ok(){
78            //bind queue to exchange
79            if exchange.is_some(){
80                let exchange_name = exchange.unwrap();
81                let args = FieldTable::new();
82                let mut m_routing_key = config.default_routing_key.clone();
83                if routing_key.is_some(){
84                    m_routing_key = routing_key.unwrap();
85                }
86                let er = channel.queue_bind(queue, exchange_name, m_routing_key, args);
87                if er.is_ok(){
88                    Ok(true)
89                }else{
90                    Err(QueueError)
91                }
92            }else {
93                Ok(true)
94            }
95        }else{
96            Err(QueueError)
97        }
98    }
99
100    /// bind a queue to an exchange
101    fn bind_to_exchange(config: CeleryConfig, channel: &Channel, exchange: String, queue: String, routing_key: String) -> Result<bool, ExchangeError> {
102        let args = FieldTable::new();
103        let r = channel.queue_bind(queue, exchange, routing_key, args);
104        if r.is_ok(){
105            Ok(true)
106        }else{
107            Err(ExchangeError)
108        }
109    }
110
111    /// send a task to the broker
112    fn send_task(config: CeleryConfig, channel: &Channel, props: Properties, headers: Headers, body: MessageBody, exchange: Option<String>, routing_key: Option<String>) -> Result<bool, PublishError> {
113        let cfg = config.clone();
114        let mut amq_properties = props.convert_to_amqp_properties();
115        let amq_headers = headers.convert_to_btree_map();
116        let json_val = Value::from(body.convert_to_json_map());
117        let mut json_message = to_string(&json_val);
118        if json_message.is_ok() {
119            let mut m_routing_key = cfg.default_routing_key.clone();
120            let mut m_exchange = cfg.default_routing_key;
121            if exchange.is_some(){
122                m_exchange = exchange.unwrap();
123            }
124            if routing_key.is_some(){
125                m_routing_key = routing_key.unwrap();
126            }
127            amq_properties = amq_properties.with_headers(amq_headers);
128            let jmessage = json_message.unwrap();
129            let jbytes = jmessage.as_bytes();
130            let mut publish = Publish::with_properties(jbytes, m_routing_key, amq_properties);
131            channel.basic_publish(m_exchange, publish);
132            Ok(true)
133        }else{
134            let e = PublishError;
135            Err(e)
136        }
137    }
138}
139
140
141/// Rabbit MQ broker
142impl RabbitMQBroker{
143
144    /// Start a task
145    pub fn start_task(&self) {
146
147    }
148
149    /// Create a new broker
150    pub fn new(config: CeleryConfig) -> RabbitMQBroker{
151        RabbitMQBroker{
152            config: config.clone(),
153        }
154    }
155}
156
157
158#[cfg(test)]
159mod tests {
160    use std::borrow::BorrowMut;
161    use std::ops::Deref;
162    use std::thread;
163    use std::thread::JoinHandle;
164
165    use amq_protocol::frame::AMQPFrameType::Header;
166    use tokio::prelude::*;
167    use tokio::runtime::Runtime;
168    use uuid::Uuid;
169
170    use crate::amqp::amqp::AMQPConnectionInf;
171    use crate::backend::config::BackendConfig;
172    use crate::broker::broker::{AMQPBroker, RabbitMQBroker};
173    use crate::config::config::CeleryConfig;
174    use crate::connection::rabbitmq_connection_pool::ThreadableRabbitMQConnectionPool;
175
176    use super::*;
177
178    fn get_config() -> CeleryConfig {
179        let protocol = "amqp".to_string();
180        let host = "127.0.0.1".to_string();
181        let port = 5672;
182        let vhost = Some("test".to_string());
183        let username = Some("dev".to_string());
184        let password = Some("rtp*4500".to_string());
185        let broker_conn = AMQPConnectionInf::new(protocol, host, port, vhost, username, password, false);
186        let backend = BackendConfig{
187            url: "rpc://".to_string(),
188            username: None,
189            password: None,
190            transport_options: None,
191        };
192        let conf = CeleryConfig::new(broker_conn, backend);
193        conf
194    }
195
196    #[test]
197    fn should_create_queue(){
198        let conf = get_config();
199        let rmq = RabbitMQBroker::new(conf.clone());
200        let conn_inf = conf.connection_inf.clone();
201        let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
202        pool.start();
203        let rconn = pool.get_connection();
204        if rconn.is_ok(){
205            let mut c = rconn.unwrap();
206            let channel = c.connection.open_channel(None).unwrap();
207            let uuid = format!("{}", Uuid::new_v4());
208            let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
209            c.connection.close();
210            assert!(rq.is_ok());
211        }else{
212            assert!(false);
213        }
214    }
215
216    #[test]
217    fn should_create_and_bind_queue_to_exchange(){
218        let conf = get_config();
219        let rmq = RabbitMQBroker::new(conf.clone());
220        let conn_inf = conf.connection_inf.clone();
221        let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
222        pool.start();
223        let rconn = pool.get_connection();
224        if rconn.is_ok(){
225            let mut c = rconn.unwrap();
226            let channel = c.connection.open_channel(None).unwrap();
227            let uuid = format!("{}", Uuid::new_v4());
228            let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
229            RabbitMQBroker::bind_to_exchange(conf.clone(), &channel,  "test_exchange".to_string(), "test_queue".to_string(), "test_routing_key".to_string());
230            c.connection.close();
231            assert!(rq.is_ok());
232        }else{
233            assert!(false);
234        }
235    }
236
237    #[test]
238    fn should_send_task_to_queue(){
239        let conf = get_config();
240        let rmq = RabbitMQBroker::new(conf.clone());
241        let conn_inf = conf.connection_inf.clone();
242        let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
243        pool.start();
244        let rconn = pool.get_connection();
245        if rconn.is_ok(){
246            let mut c = rconn.unwrap();
247            let channel = c.connection.open_channel(None).unwrap();
248            let uuid = format!("{}", Uuid::new_v4());
249
250            // create queue if necessary
251            let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
252
253            // create and send task
254            let body = MessageBody::new(None, None, None, None);
255            let uuid = Uuid::new_v4();
256            let ustr = format!("{}", uuid);
257            let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
258            let reply_queue = Uuid::new_v4();
259            let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
260            let br = RabbitMQBroker::send_task(conf, &channel,props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
261            c.connection.close();
262            assert!(br.is_ok());
263            assert!(rq.is_ok());
264        }else{
265            assert!(false);
266        }
267    }
268
269    #[test]
270    fn should_work_with_threads(){
271        let cnf = get_config();
272        let rmq = RabbitMQBroker::new(cnf.clone());
273        let conn_inf = cnf.connection_inf.clone();
274        let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
275        pool.start();
276        let rconn = pool.get_connection();
277        if rconn.is_ok() {
278            let mut c = rconn.unwrap();
279            let channel = c.connection.open_channel(None).unwrap();
280            let mut conf = cnf.clone();
281            let ja = thread::spawn( move ||{
282                for i in 0..2666 {
283                    let uuid = format!("{}", Uuid::new_v4());
284                    // create queue if necessary
285
286                    let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
287
288                    // create and send task
289                    let body = MessageBody::new(None, None, None, None);
290                    let uuid = Uuid::new_v4();
291                    let ustr = format!("{}", uuid);
292                    let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
293                    let reply_queue = Uuid::new_v4();
294                    let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
295                    let br = RabbitMQBroker::send_task(conf.clone(), &channel, props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
296                }
297            });
298
299            conf = cnf.clone();
300            let channelb = c.connection.open_channel(None).unwrap();
301            let jb = thread::spawn( move ||{
302                for i in 0..2666 {
303                    let uuid = format!("{}", Uuid::new_v4());
304                    // create queue if necessary
305
306                    let rq = RabbitMQBroker::create_queue(conf.clone(), &channelb, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
307
308                    // create and send task
309                    let body = MessageBody::new(None, None, None, None);
310                    let uuid = Uuid::new_v4();
311                    let ustr = format!("{}", uuid);
312                    let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
313                    let reply_queue = Uuid::new_v4();
314                    let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
315                    let br = RabbitMQBroker::send_task(conf.clone(), &channelb, props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
316                }
317            });
318
319            conf = cnf.clone();
320            let channelc = c.connection.open_channel(None).unwrap();
321            let jc = thread::spawn( move ||{
322                for i in 0..2666 {
323                    let uuid = format!("{}", Uuid::new_v4());
324                    // create queue if necessary
325
326                    let rq = RabbitMQBroker::create_queue(conf.clone(), &channelc, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
327
328                    // create and send task
329                    let body = MessageBody::new(None, None, None, None);
330                    let uuid = Uuid::new_v4();
331                    let ustr = format!("{}", uuid);
332                    let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
333                    let reply_queue = Uuid::new_v4();
334                    let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
335                    let br = RabbitMQBroker::send_task(conf.clone(), &channelc, props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
336                }
337            });
338
339            ja.join();
340            jb.join();
341            jc.join();
342            c.connection.server_properties();
343        }
344    }
345
346    #[test]
347    fn should_work_with_tokio(){
348        let rt = Runtime::new().unwrap();
349        let cnf = get_config();
350        let rmq = RabbitMQBroker::new(cnf.clone());
351        let conn_inf = cnf.connection_inf.clone();
352        let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
353        pool.start();
354        let rconn = pool.get_connection();
355        if rconn.is_ok() {
356            let mut c = rconn.unwrap();
357            for i in 0..8000 {
358                let channel = c.connection.open_channel(None).unwrap();
359                let conf = cnf.clone();
360                let j = rt.spawn(async move {
361                    let uuid = format!("{}", Uuid::new_v4());
362                    // create queue if necessary
363
364                    let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
365
366                    // create and send task
367                    let body = MessageBody::new(None, None, None, None);
368                    let uuid = Uuid::new_v4();
369                    let ustr = format!("{}", uuid);
370                    let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
371                    let reply_queue = Uuid::new_v4();
372                    let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
373                    let br = RabbitMQBroker::send_task(conf, &channel, props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
374                });
375            }
376            rt.shutdown_on_idle();
377            c.connection.server_properties();
378        }
379    }
380}