1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
use deadpool_lapin::lapin;
use deadpool_lapin::lapin::{
options::{
BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, BasicQosOptions,
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
},
types::FieldTable,
BasicProperties, Consumer, ExchangeKind, Queue,
};
use futures::StreamExt;
#[derive(Clone)]
pub struct RabbitClient {
pub channel: lapin::Channel,
}
// direct: 如果路由键匹配的话消息就会被投递到对应的队列。
// fanout: 广播消息到多个队列, 即将消息投递到所有附加在此交换器上的所有队列。
// topic: 将多个不同源头的消息投递到同一个队列。
impl RabbitClient {
// 创建队列
// queue_declare 如果指定的队列不存在,则创建该队列;
// 如果您为队列名称指定空字符串,RabbitMQ 将创建一个新队列、生成名称并返回它。
// 如果存在且与指定的设置匹配,则不执行任何操作。
// exclusive: 通过指定 Exclusive,队列将对消费者是独占的。
pub async fn queue_declare(&self, queue_name: &str, exclusive: bool) -> anyhow::Result<Queue> {
let opt = if exclusive {
QueueDeclareOptions {
// 通过指定 Exclusive,队列将对消费者是独占的。(独立队列)
exclusive: true,
..Default::default()
}
} else {
QueueDeclareOptions::default()
};
Ok(self
.channel
.queue_declare(queue_name, opt, FieldTable::default())
.await?)
}
// direct 实现 使用不同的 routing key 多次绑定同一个 exchange 和 queue。
// 直接交换类型的工作原理如下:
// 1. 消息队列使用路由键 K 绑定到交换器。
// 2. 发布者向交换器发送带有路由键 R 的消息。
// 3. 如果 K = R,则消息将传递到消息队列。
// 服务器必须实现直接交换类型,并且必须在每个虚拟主机内预先声明至少两个直接交换:
// 一个名为 amq.direct,另一个没有服务的公共名称作为 Publish 方法的默认交换
//
// fanout 实现所谓的 Pub/Sub 模式(广播消息到多个队列),其中每个消费者都有一个独立的队列,并将一条消息发送到多个队列。
//
// Topic 交换器中的路由键可以使用句点 . 例如: aaa.bbb.ccc 等等
// 可以用 * 匹配一个级别,也可以用 # 匹配任何级别。
// 例如1: 消费者1 绑定 tracking.# 键。这将订阅任何跟踪域上的事件。
// 例如2: 消费者2 绑定 *.tenant.* 键 (<domain>.<tenant>.<event>)。无论域或事件类型如何,都以租户 tenant 为目标。
pub async fn exchange_declare(
&self,
exchange_name: &str,
exchange_kind: ExchangeKind,
) -> anyhow::Result<()> {
self.channel
.exchange_declare(
exchange_name,
exchange_kind,
ExchangeDeclareOptions::default(),
FieldTable::default(),
)
.await?;
Ok(())
}
pub async fn queue_bind(
&self,
queue_name: &str,
exchange_name: &str,
routing_key: &str,
) -> anyhow::Result<()> {
// 通过绑定交换器和队列,消息将被发布。
self.channel
.queue_bind(
queue_name,
exchange_name,
routing_key,
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
Ok(())
}
// exchange: 如果将交换器指定一个空字符串,消息将发送到默认交换器。
// routing_key: 如果 exchange 为空 且 routing_key 和 队列的名字 一样则消息将发送到同名队列
// 即: 发送到默认交换器的消息,它将引用发布消息时的 routing_key,并将消息路由到和 routing_key 同名的队列中。
pub async fn publish(
&self,
exchange: &str,
routing_key: &str,
message: &str,
) -> anyhow::Result<()> {
self.channel
.basic_publish(
// ------------------------------
// 交换器是虚拟主机内的消息路由代理。
// ------------------------------
// 交换器-AMQP 3.1.3: 理。
// ------------------------------
// 交换器实例(我们通常称为“交换器”)接受消息和路由信息(主要是路由键)并将消息传递到消息队列或内部服务。
// 发布消息时,发布者将消息发布到这个交换器而不是队列。
// 如果您为交换器指定一个空字符串,它将指向默认交换器。
// 默认交换器是一种称为直接交换器的交换器,它引用发布时的 routing_key,将消息路由到同名的队列中。
// 所以,最终,如果发布者为 routing_key 指定 queue,而不指定 exchange,则可以将消息发布到目标队列。
// ------------------------------
// 交换器-AMQP 3.1.3.1: (直接交换类型的工作原理如下)。
// ------------------------------
// 1. 消息队列使用路由键 K 绑定到交换器。
// 2. 发布者向交换器发送带有路由密钥 R 的消息。
// 3. 如果 K = R,则消息将传递到消息队列。
// 服务器必须实现直接交换类型,并且必须在每个虚拟主机内预先声明至少两个直接交换:
// 一个名为 amq.direct,另一个没有服务的公共名称作为 Publish 方法的默认交换
exchange,
routing_key,
BasicPublishOptions::default(),
message.as_bytes(),
BasicProperties::default(),
)
.await?;
Ok(())
}
pub async fn send_message(
&self,
exchange: &str,
routing_key: &str,
message: &str,
response_queue_name: &str,
correlation_id: &str,
) -> anyhow::Result<()> {
self.channel
.basic_publish(
// ------------------------------
// 交换器是虚拟主机内的消息路由代理。
// ------------------------------
// 交换器-AMQP 3.1.3: 理。
// ------------------------------
// 交换器实例(我们通常称为“交换器”)接受消息和路由信息(主要是路由键)并将消息传递到消息队列或内部服务。
// 发布消息时,发布者将消息发布到这个交换器而不是队列。
// 如果您为交换器指定一个空字符串,它将指向默认交换器。
// 默认交换器是一种称为直接交换器的交换器,它引用发布时的 routing_key,将消息路由到同名的队列中。
// 所以,最终,如果发布者为 routing_key 指定 queue,而不指定 exchange,则可以将消息发布到目标队列。
// ------------------------------
// 交换器-AMQP 3.1.3.1: (直接交换类型的工作原理如下)。
// ------------------------------
// 1. 消息队列使用路由键 K 绑定到交换器。
// 2. 发布者向交换器发送带有路由密钥 R 的消息。
// 3. 如果 K = R,则消息将传递到消息队列。
// 服务器必须实现直接交换类型,并且必须在每个虚拟主机内预先声明至少两个直接交换:
// 一个名为 amq.direct,另一个没有服务的公共名称作为 Publish 方法的默认交换
exchange,
routing_key,
BasicPublishOptions::default(),
message.as_bytes(),
BasicProperties::default()
.with_reply_to(response_queue_name.into())
.with_correlation_id(correlation_id.into()),
)
.await?;
Ok(())
}
// Spawn consumer task.
// let consumer_handle = tokio::spawn({
// let channel = channel.clone();
// async move {
// if let Err(err) = consumer(channel).await {
// tracing::error!("{err:?}");
// }
// }
// });
//
// Wait forever
// consumer_handle.await?;
pub async fn consumer(&self, queue_name: &str, consumer_tag: &str) -> anyhow::Result<Consumer> {
// basic_qos 指定消费者一次从队列中检索多少条消息。由于这里指定了1.
// 队列中的消息会按顺序分发给消费者1和2。
self.channel
.basic_qos(1, BasicQosOptions::default())
.await?;
let consumer = self
.channel
.basic_consume(
queue_name,
consumer_tag,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
Ok(consumer)
}
pub async fn consumer_loop(
&self,
name: &str,
queue_name: &str,
mut consumer: Consumer,
) -> anyhow::Result<()> {
if let Some(Ok(delivery)) = consumer.next().await {
let data = String::from_utf8_lossy(delivery.data.as_slice());
log::info!(
"consumer: name={}, queue_name={}, data={}",
name,
queue_name,
data
);
if let Some(correlation_id) = delivery.properties.correlation_id() {
self.channel
.basic_publish(
"",
delivery.properties.reply_to().clone().unwrap().as_str(),
BasicPublishOptions::default(),
"Ok".as_bytes(),
BasicProperties::default().with_correlation_id(correlation_id.to_owned()),
)
.await?;
} else {
// 消费者通知 RabbitMQ 服务器消息处理成功,并从队列中删除该消息。
if let Err(e) = delivery.ack(BasicAckOptions::default()).await {
log::error!("consumer-ack: queue_name={}, error={:?}", queue_name, e);
}
}
}
Ok(())
}
}