1use std::any::Any;
8use std::collections::BTreeMap;
9use std::env::Args;
10use std::error::Error;
11use std::iter::Map;
12
13use amiquip::{AmqpProperties, AmqpValue, Channel, Exchange, ExchangeDeclareOptions, ExchangeType, FieldTable, Publish, Queue, QueueDeclareOptions};
14use serde_json::{to_string, Value};
15use serde_json::map::Values;
16
17use crate::amqp::{exchange_error::ExchangeError, publish_error::PublishError, queue_error::QueueError};
18use crate::amqp::amqp::AMQPConnectionInf;
19use crate::argparse::kwargs::KwArgs;
20use crate::config::config::CeleryConfig;
21use crate::connection::rabbitmq_connection_pool::ThreadableRabbitMQConnectionPool;
22use crate::connection::threadable_rabbit_mq_connection::ThreadableRabbitMQConnection;
23use crate::message_protocol::{headers::Headers, message::Message, message_body::MessageBody, properties::Properties};
24use crate::task::config::TaskConfig;
25
26pub struct RabbitMQBroker{
28 config: CeleryConfig,
29}
30
31
32pub trait AMQPBroker{
34
35 fn bind_to_exchange(config: CeleryConfig, channel: &Channel, exchange: String, queue: String, routing_key: String) -> Result<bool, ExchangeError>;
37
38 fn create_queue(config: CeleryConfig, channel: &Channel, durable: bool, queue: String, declare_exchange: bool, uuid: String, exchange: Option<String>, routing_key: Option<String>) -> Result<bool, QueueError>;
40
41 fn create_exchange(config: CeleryConfig, channel: &Channel, durable: bool, exchange: String, exchange_type: ExchangeType) -> Result<bool, ExchangeError>;
43
44 fn send_task(config: CeleryConfig, channel: &Channel, props: Properties, headers: Headers, body: MessageBody, exchange: Option<String>, routing_key: Option<String>) -> Result<bool, PublishError>;
46}
47
48
49impl AMQPBroker for RabbitMQBroker{
51
52 fn create_exchange(config: CeleryConfig, channel: &Channel, durable: bool, exchange: String, exchange_type: ExchangeType) -> Result<bool, ExchangeError> {
54 let mut opts = ExchangeDeclareOptions::default();
55 opts.durable = durable;
56 let r = channel.exchange_declare(exchange_type, exchange, opts);
57 if r.is_ok(){
58 Ok(true)
59 }else{
60 Err(ExchangeError)
61 }
62 }
63
64 fn create_queue(config: CeleryConfig, channel: &Channel, durable: bool, queue: String, declare_exchange: bool, uuid: String, exchange: Option<String>, routing_key: Option<String>) -> Result<bool, QueueError>{
66 let mut qopts = QueueDeclareOptions::default();
67 if declare_exchange{
68 let mut etype = ExchangeType::Direct;
69 let mut eopts= ExchangeDeclareOptions::default();
70 eopts.durable = durable;
71 channel.exchange_declare(etype, exchange.clone().unwrap(), eopts);
72 }
73 if durable {
74 qopts.durable = durable;
75 }
76 let r = channel.queue_declare(queue.clone(), qopts);
77 if r.is_ok(){
78 if exchange.is_some(){
80 let exchange_name = exchange.unwrap();
81 let args = FieldTable::new();
82 let mut m_routing_key = config.default_routing_key.clone();
83 if routing_key.is_some(){
84 m_routing_key = routing_key.unwrap();
85 }
86 let er = channel.queue_bind(queue, exchange_name, m_routing_key, args);
87 if er.is_ok(){
88 Ok(true)
89 }else{
90 Err(QueueError)
91 }
92 }else {
93 Ok(true)
94 }
95 }else{
96 Err(QueueError)
97 }
98 }
99
100 fn bind_to_exchange(config: CeleryConfig, channel: &Channel, exchange: String, queue: String, routing_key: String) -> Result<bool, ExchangeError> {
102 let args = FieldTable::new();
103 let r = channel.queue_bind(queue, exchange, routing_key, args);
104 if r.is_ok(){
105 Ok(true)
106 }else{
107 Err(ExchangeError)
108 }
109 }
110
111 fn send_task(config: CeleryConfig, channel: &Channel, props: Properties, headers: Headers, body: MessageBody, exchange: Option<String>, routing_key: Option<String>) -> Result<bool, PublishError> {
113 let cfg = config.clone();
114 let mut amq_properties = props.convert_to_amqp_properties();
115 let amq_headers = headers.convert_to_btree_map();
116 let json_val = Value::from(body.convert_to_json_map());
117 let mut json_message = to_string(&json_val);
118 if json_message.is_ok() {
119 let mut m_routing_key = cfg.default_routing_key.clone();
120 let mut m_exchange = cfg.default_routing_key;
121 if exchange.is_some(){
122 m_exchange = exchange.unwrap();
123 }
124 if routing_key.is_some(){
125 m_routing_key = routing_key.unwrap();
126 }
127 amq_properties = amq_properties.with_headers(amq_headers);
128 let jmessage = json_message.unwrap();
129 let jbytes = jmessage.as_bytes();
130 let mut publish = Publish::with_properties(jbytes, m_routing_key, amq_properties);
131 channel.basic_publish(m_exchange, publish);
132 Ok(true)
133 }else{
134 let e = PublishError;
135 Err(e)
136 }
137 }
138}
139
140
141impl RabbitMQBroker{
143
144 pub fn start_task(&self) {
146
147 }
148
149 pub fn new(config: CeleryConfig) -> RabbitMQBroker{
151 RabbitMQBroker{
152 config: config.clone(),
153 }
154 }
155}
156
157
158#[cfg(test)]
159mod tests {
160 use std::borrow::BorrowMut;
161 use std::ops::Deref;
162 use std::thread;
163 use std::thread::JoinHandle;
164
165 use amq_protocol::frame::AMQPFrameType::Header;
166 use tokio::prelude::*;
167 use tokio::runtime::Runtime;
168 use uuid::Uuid;
169
170 use crate::amqp::amqp::AMQPConnectionInf;
171 use crate::backend::config::BackendConfig;
172 use crate::broker::broker::{AMQPBroker, RabbitMQBroker};
173 use crate::config::config::CeleryConfig;
174 use crate::connection::rabbitmq_connection_pool::ThreadableRabbitMQConnectionPool;
175
176 use super::*;
177
178 fn get_config() -> CeleryConfig {
179 let protocol = "amqp".to_string();
180 let host = "127.0.0.1".to_string();
181 let port = 5672;
182 let vhost = Some("test".to_string());
183 let username = Some("dev".to_string());
184 let password = Some("rtp*4500".to_string());
185 let broker_conn = AMQPConnectionInf::new(protocol, host, port, vhost, username, password, false);
186 let backend = BackendConfig{
187 url: "rpc://".to_string(),
188 username: None,
189 password: None,
190 transport_options: None,
191 };
192 let conf = CeleryConfig::new(broker_conn, backend);
193 conf
194 }
195
196 #[test]
197 fn should_create_queue(){
198 let conf = get_config();
199 let rmq = RabbitMQBroker::new(conf.clone());
200 let conn_inf = conf.connection_inf.clone();
201 let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
202 pool.start();
203 let rconn = pool.get_connection();
204 if rconn.is_ok(){
205 let mut c = rconn.unwrap();
206 let channel = c.connection.open_channel(None).unwrap();
207 let uuid = format!("{}", Uuid::new_v4());
208 let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
209 c.connection.close();
210 assert!(rq.is_ok());
211 }else{
212 assert!(false);
213 }
214 }
215
216 #[test]
217 fn should_create_and_bind_queue_to_exchange(){
218 let conf = get_config();
219 let rmq = RabbitMQBroker::new(conf.clone());
220 let conn_inf = conf.connection_inf.clone();
221 let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
222 pool.start();
223 let rconn = pool.get_connection();
224 if rconn.is_ok(){
225 let mut c = rconn.unwrap();
226 let channel = c.connection.open_channel(None).unwrap();
227 let uuid = format!("{}", Uuid::new_v4());
228 let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
229 RabbitMQBroker::bind_to_exchange(conf.clone(), &channel, "test_exchange".to_string(), "test_queue".to_string(), "test_routing_key".to_string());
230 c.connection.close();
231 assert!(rq.is_ok());
232 }else{
233 assert!(false);
234 }
235 }
236
237 #[test]
238 fn should_send_task_to_queue(){
239 let conf = get_config();
240 let rmq = RabbitMQBroker::new(conf.clone());
241 let conn_inf = conf.connection_inf.clone();
242 let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
243 pool.start();
244 let rconn = pool.get_connection();
245 if rconn.is_ok(){
246 let mut c = rconn.unwrap();
247 let channel = c.connection.open_channel(None).unwrap();
248 let uuid = format!("{}", Uuid::new_v4());
249
250 let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
252
253 let body = MessageBody::new(None, None, None, None);
255 let uuid = Uuid::new_v4();
256 let ustr = format!("{}", uuid);
257 let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
258 let reply_queue = Uuid::new_v4();
259 let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
260 let br = RabbitMQBroker::send_task(conf, &channel,props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
261 c.connection.close();
262 assert!(br.is_ok());
263 assert!(rq.is_ok());
264 }else{
265 assert!(false);
266 }
267 }
268
269 #[test]
270 fn should_work_with_threads(){
271 let cnf = get_config();
272 let rmq = RabbitMQBroker::new(cnf.clone());
273 let conn_inf = cnf.connection_inf.clone();
274 let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
275 pool.start();
276 let rconn = pool.get_connection();
277 if rconn.is_ok() {
278 let mut c = rconn.unwrap();
279 let channel = c.connection.open_channel(None).unwrap();
280 let mut conf = cnf.clone();
281 let ja = thread::spawn( move ||{
282 for i in 0..2666 {
283 let uuid = format!("{}", Uuid::new_v4());
284 let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
287
288 let body = MessageBody::new(None, None, None, None);
290 let uuid = Uuid::new_v4();
291 let ustr = format!("{}", uuid);
292 let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
293 let reply_queue = Uuid::new_v4();
294 let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
295 let br = RabbitMQBroker::send_task(conf.clone(), &channel, props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
296 }
297 });
298
299 conf = cnf.clone();
300 let channelb = c.connection.open_channel(None).unwrap();
301 let jb = thread::spawn( move ||{
302 for i in 0..2666 {
303 let uuid = format!("{}", Uuid::new_v4());
304 let rq = RabbitMQBroker::create_queue(conf.clone(), &channelb, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
307
308 let body = MessageBody::new(None, None, None, None);
310 let uuid = Uuid::new_v4();
311 let ustr = format!("{}", uuid);
312 let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
313 let reply_queue = Uuid::new_v4();
314 let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
315 let br = RabbitMQBroker::send_task(conf.clone(), &channelb, props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
316 }
317 });
318
319 conf = cnf.clone();
320 let channelc = c.connection.open_channel(None).unwrap();
321 let jc = thread::spawn( move ||{
322 for i in 0..2666 {
323 let uuid = format!("{}", Uuid::new_v4());
324 let rq = RabbitMQBroker::create_queue(conf.clone(), &channelc, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
327
328 let body = MessageBody::new(None, None, None, None);
330 let uuid = Uuid::new_v4();
331 let ustr = format!("{}", uuid);
332 let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
333 let reply_queue = Uuid::new_v4();
334 let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
335 let br = RabbitMQBroker::send_task(conf.clone(), &channelc, props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
336 }
337 });
338
339 ja.join();
340 jb.join();
341 jc.join();
342 c.connection.server_properties();
343 }
344 }
345
346 #[test]
347 fn should_work_with_tokio(){
348 let rt = Runtime::new().unwrap();
349 let cnf = get_config();
350 let rmq = RabbitMQBroker::new(cnf.clone());
351 let conn_inf = cnf.connection_inf.clone();
352 let mut pool = ThreadableRabbitMQConnectionPool::new(conn_inf, 2);
353 pool.start();
354 let rconn = pool.get_connection();
355 if rconn.is_ok() {
356 let mut c = rconn.unwrap();
357 for i in 0..8000 {
358 let channel = c.connection.open_channel(None).unwrap();
359 let conf = cnf.clone();
360 let j = rt.spawn(async move {
361 let uuid = format!("{}", Uuid::new_v4());
362 let rq = RabbitMQBroker::create_queue(conf.clone(), &channel, true, String::from("test_queue"), true, uuid, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
365
366 let body = MessageBody::new(None, None, None, None);
368 let uuid = Uuid::new_v4();
369 let ustr = format!("{}", uuid);
370 let headers = Headers::new("rs".to_string(), "test_task".to_string(), ustr.clone(), ustr.clone());
371 let reply_queue = Uuid::new_v4();
372 let props = Properties::new(ustr.clone(), "application/json".to_string(), "utf-8".to_string(), None);
373 let br = RabbitMQBroker::send_task(conf, &channel, props, headers, body, Some("test_exchange".to_string()), Some("test_routing_key".to_string()));
374 });
375 }
376 rt.shutdown_on_idle();
377 c.connection.server_properties();
378 }
379 }
380}