1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use crate::configuration::config_model::ConnectionProperties;
use crate::consumer::connection_manager::build_url;
use crate::consumer::connection_manager::create_channel;
use amq_protocol_types::ShortString;
use lapin::options::BasicPublishOptions;
use lapin::publisher_confirm::PublisherConfirm;
use lapin::BasicProperties;
#[derive(Debug)]
pub struct PublishError {
pub why: lapin::Error,
}
pub async fn publish(
message: String,
exchange_name: &str,
routing_key: &str,
connection: ConnectionProperties,
) -> Result<PublisherConfirm, PublishError> {
let channel = create_channel(build_url(connection.clone()).as_str(), connection.retry)
.await
.expect("channel to be created");
let outcome = match channel
.basic_publish(
exchange_name,
routing_key,
BasicPublishOptions::default(),
message.as_bytes().to_vec(),
BasicProperties::default(),
)
.await
{
Ok(result) => Ok(result),
Err(why) => Err(PublishError { why }),
};
let _closed = channel.close(0, "shutting down").await;
return outcome;
}
pub async fn publish_with_type(
message: String,
exchange_name: &str,
routing_key: &str,
connection: ConnectionProperties,
message_type: &str,
) -> Result<PublisherConfirm, PublishError> {
let channel = create_channel(build_url(connection.clone()).as_str(), connection.retry)
.await
.expect("channel to be created");
let base_properties = BasicProperties::default();
let final_properties = base_properties.with_content_type(ShortString::from(message_type));
let outcome = match channel
.basic_publish(
exchange_name,
routing_key,
BasicPublishOptions::default(),
message.as_bytes().to_vec(),
final_properties,
)
.await
{
Ok(result) => Ok(result),
Err(why) => Err(PublishError { why }),
};
let _closed = channel.close(0, "shutting down").await;
return outcome;
}