1use std::cmp::Ordering;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::key_builder::POP_ORDER_REVIVE_QUEUE;
21use rocketmq_common::common::message::message_decoder;
22use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
23use rocketmq_common::common::message::MessageConst;
24use rocketmq_common::common::message::MessageTrait;
25use rocketmq_common::common::mix_all::MASTER_ID;
26use rocketmq_common::common::pop_ack_constants::PopAckConstants;
27use rocketmq_common::common::FAQUrl;
28use rocketmq_common::TimeUtils::get_current_millis;
29use rocketmq_error::RocketMQResult;
30use rocketmq_remoting::code::request_code::RequestCode;
31use rocketmq_remoting::code::response_code::ResponseCode;
32use rocketmq_remoting::net::channel::Channel;
33use rocketmq_remoting::protocol::body::batch_ack::BatchAck;
34use rocketmq_remoting::protocol::body::batch_ack_message_request_body::BatchAckMessageRequestBody;
35use rocketmq_remoting::protocol::header::ack_message_request_header::AckMessageRequestHeader;
36use rocketmq_remoting::protocol::header::extra_info_util::ExtraInfoUtil;
37use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
38use rocketmq_remoting::protocol::RemotingDeserializable;
39use rocketmq_remoting::protocol::RemotingSerializable;
40use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
41use rocketmq_rust::ArcMut;
42use rocketmq_store::base::message_status_enum::PutMessageStatus;
43use rocketmq_store::base::message_store::MessageStore;
44use rocketmq_store::pop::ack_msg::AckMsg;
45use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
46use rocketmq_store::pop::AckMessage;
47use tracing::error;
48use tracing::warn;
49
50use crate::broker_runtime::BrokerRuntimeInner;
51use crate::processor::pop_message_processor::PopMessageProcessor;
52use crate::processor::processor_service::pop_revive_service::PopReviveService;
53
54pub struct AckMessageProcessor<MS: MessageStore> {
55 pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
56 broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
57 revive_topic: CheetahString,
58 pop_revive_services: Vec<ArcMut<PopReviveService<MS>>>,
59}
60
61impl<MS> AckMessageProcessor<MS>
62where
63 MS: MessageStore,
64{
65 pub fn new(
66 broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
67 pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
68 ) -> AckMessageProcessor<MS> {
69 let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic(
70 broker_runtime_inner
71 .broker_config()
72 .broker_identity
73 .broker_cluster_name
74 .as_str(),
75 ));
76 let mut pop_revive_services = vec![];
77 let is_run_pop_revive = broker_runtime_inner
78 .broker_config()
79 .broker_identity
80 .broker_id
81 == MASTER_ID;
82
83 for i in 0..broker_runtime_inner.broker_config().revive_queue_num {
85 let mut pop_revive_service =
86 PopReviveService::new(revive_topic.clone(), i as i32, broker_runtime_inner.clone());
87 pop_revive_service.set_should_run_pop_revive(is_run_pop_revive);
88 pop_revive_services.push(ArcMut::new(pop_revive_service));
89 }
90 AckMessageProcessor {
91 pop_message_processor,
92 broker_runtime_inner,
93 revive_topic,
94 pop_revive_services,
95 }
96 }
97
98 pub async fn process_request(
99 &mut self,
100 channel: Channel,
101 ctx: ConnectionHandlerContext,
102 request_code: RequestCode,
103 request: RemotingCommand,
104 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
105 match request_code {
106 RequestCode::AckMessage => self.process_ack(channel, ctx, request, true).await,
107 RequestCode::BatchAckMessage => {
108 self.process_batch_ack(channel, ctx, request, true).await
109 }
110 _ => {
111 error!(
112 "AckMessageProcessor failed to process RequestCode: {}, consumer: {} ",
113 request_code.to_i32(),
114 channel.remote_address()
115 );
116 Ok(Some(
117 RemotingCommand::create_response_command_with_code_remark(
118 ResponseCode::MessageIllegal,
119 format!(
120 "AckMessageProcessor failed to process RequestCode: {request_code:?}",
121 ),
122 ),
123 ))
124 }
125 }
126 }
127
128 pub fn start(&mut self) {
129 for pop_revive_service in self.pop_revive_services.iter() {
130 PopReviveService::start(pop_revive_service.clone());
131 }
132 }
133
134 pub fn set_pop_revive_service_status(&mut self, status: bool) {
135 for pop_revive_service in self.pop_revive_services.iter_mut() {
136 pop_revive_service.set_should_run_pop_revive(status);
137 }
138 }
139}
140
141impl<MS> AckMessageProcessor<MS>
142where
143 MS: MessageStore,
144{
145 async fn process_ack(
146 &mut self,
147 channel: Channel,
148 _ctx: ConnectionHandlerContext,
149 request: RemotingCommand,
150 _broker_allow_suspend: bool,
151 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
152 let request_header = request.decode_command_custom_header::<AckMessageRequestHeader>()?;
153 let topic_config = self
154 .broker_runtime_inner
155 .topic_config_manager()
156 .select_topic_config(&request_header.topic);
157 if topic_config.is_none() {
158 error!(
159 "topic[{}] not exist, consumer: {},apply first please! {}",
160 request_header.topic,
161 channel.remote_address(),
162 FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
163 );
164 return Ok(Some(
165 RemotingCommand::create_response_command_with_code_remark(
166 ResponseCode::TopicNotExist,
167 format!(
168 "topic[{}] not exist, apply first please! {}",
169 request_header.topic,
170 FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
171 ),
172 ),
173 ));
174 }
175 let topic_config = topic_config.unwrap();
176 if request_header.queue_id >= topic_config.read_queue_nums as i32
177 || request_header.queue_id < 0
178 {
179 let error_msg = format!(
180 "queueId{}] is illegal, topic:[{}] topicConfig.readQueueNums:[{}] consumer:[{}]",
181 request_header.queue_id,
182 request_header.topic,
183 topic_config.read_queue_nums,
184 channel.remote_address()
185 );
186 warn!("{}", error_msg);
187
188 return Ok(Some(
189 RemotingCommand::create_response_command_with_code_remark(
190 ResponseCode::MessageIllegal,
191 error_msg,
192 ),
193 ));
194 }
195 let message_store_inner = self.broker_runtime_inner.message_store().as_ref().unwrap();
196 let min_offset = message_store_inner
197 .get_min_offset_in_queue(&request_header.topic, request_header.queue_id);
198 let max_offset = message_store_inner
199 .get_max_offset_in_queue(&request_header.topic, request_header.queue_id);
200 if request_header.offset < min_offset || request_header.offset > max_offset {
201 let error_msg = format!(
202 "request offset not in queue offset range, request offset: {}, min offset: {}, \
203 max offset: {}",
204 request_header.offset, min_offset, max_offset
205 );
206 warn!("{}", error_msg);
207
208 return Ok(Some(
209 RemotingCommand::create_response_command_with_code_remark(
210 ResponseCode::NoMessage,
211 error_msg,
212 ),
213 ));
214 }
215 let mut response = RemotingCommand::create_response_command();
216 self.append_ack(Some(request_header), &mut response, None, &channel, None)
217 .await?;
218 Ok(Some(response))
219 }
220
221 async fn process_batch_ack(
222 &mut self,
223 _channel: Channel,
224 _ctx: ConnectionHandlerContext,
225 request: RemotingCommand,
226 _broker_allow_suspend: bool,
227 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
228 if request.get_body().is_none() {
229 return Ok(Some(RemotingCommand::create_response_command_with_code(
230 ResponseCode::NoMessage,
231 )));
232 }
233 let req_body = BatchAckMessageRequestBody::decode(request.get_body().unwrap())?;
234 if req_body.acks.is_empty() {
235 return Ok(Some(RemotingCommand::create_response_command_with_code(
236 ResponseCode::NoMessage,
237 )));
238 }
239 let mut response = RemotingCommand::create_response_command();
240 let broker_name = &req_body.broker_name;
241 for ack in req_body.acks {
242 self.append_ack(None, &mut response, Some(ack), &_channel, Some(broker_name))
243 .await?;
244 }
245 Ok(Some(response))
246 }
247
248 async fn append_ack(
249 &mut self,
250 request_header: Option<AckMessageRequestHeader>,
251 response: &mut RemotingCommand,
252 batch_ack: Option<BatchAck>,
253 channel: &Channel,
254 broker_name: Option<&CheetahString>,
255 ) -> RocketMQResult<()> {
256 let (
258 consume_group,
259 topic,
260 qid,
261 r_qid,
262 start_offset,
263 ack_offset,
264 pop_time,
265 invisible_time,
266 ack_count,
267 mut ack_msg,
268 broker_name,
269 ) = if let Some(request_header) = request_header {
270 let extra_info = ExtraInfoUtil::split(request_header.extra_info.as_str());
271 let broker_name = ExtraInfoUtil::get_broker_name(extra_info.as_slice())?;
272 let consume_group = request_header.consumer_group;
273 let topic = request_header.topic;
274 let qid = request_header.queue_id;
275 let r_qid = ExtraInfoUtil::get_revive_qid(extra_info.as_slice())?;
276 let start_offset = ExtraInfoUtil::get_ck_queue_offset(extra_info.as_slice())?;
277 let ack_offset = request_header.offset;
278 let pop_time = ExtraInfoUtil::get_pop_time(extra_info.as_slice())?;
279 let invisible_time = ExtraInfoUtil::get_invisible_time(extra_info.as_slice())?;
280 if r_qid == POP_ORDER_REVIVE_QUEUE {
281 self.ack_orderly(
282 topic,
283 consume_group,
284 qid,
285 ack_offset,
286 pop_time,
287 invisible_time,
288 channel,
289 response,
290 )
291 .await;
292 return Ok(());
293 }
294 let ack = AckMsg::default();
295 let ack_count = 1;
296 (
297 consume_group,
298 topic,
299 qid,
300 r_qid,
301 start_offset,
302 ack_offset,
303 pop_time,
304 invisible_time,
305 ack_count,
306 Box::new(ack) as Box<dyn AckMessage + Send>,
307 CheetahString::from(broker_name),
308 )
309 } else {
310 let batch_ack = batch_ack.unwrap();
312 let consumer_group = batch_ack.consumer_group;
313 let topic = CheetahString::from(ExtraInfoUtil::get_real_topic_with_retry(
314 batch_ack.topic.as_str(),
315 consumer_group.as_str(),
316 batch_ack.retry.as_str(),
317 )?);
318 let qid = batch_ack.queue_id;
319 let r_qid = batch_ack.revive_queue_id;
320 let start_offset = batch_ack.start_offset;
321 let akc_offset = -1;
322 let pop_time = batch_ack.pop_time;
323 let invisible_time = batch_ack.invisible_time;
324 let message_store = self.broker_runtime_inner.message_store().as_ref().unwrap();
325 let min_offset = message_store.get_min_offset_in_queue(&topic, qid);
326 let max_offset = message_store.get_max_offset_in_queue(&topic, qid);
327 if min_offset == -1 || max_offset == -1 {
328 return Ok(());
330 }
331
332 let mut batch_ack_msg = BatchAckMsg::default();
333
334 let bit_set = &batch_ack.bit_set.0;
335 for i in bit_set.iter_ones() {
336 if i == usize::MAX {
337 break;
338 }
339 let offset = batch_ack.start_offset + i as i64;
340 if offset < min_offset || offset > max_offset {
341 continue;
342 }
343 if r_qid == POP_ORDER_REVIVE_QUEUE {
344 self.ack_orderly(
345 topic.clone(),
346 consumer_group.clone(),
347 qid,
348 offset,
349 pop_time,
350 invisible_time,
351 channel,
352 response,
353 )
354 .await;
355 } else {
356 batch_ack_msg.ack_offset_list.push(offset);
357 }
358 }
359 if r_qid == POP_ORDER_REVIVE_QUEUE || batch_ack_msg.ack_offset_list.is_empty() {
360 return Ok(());
361 }
362 let ack_count = batch_ack_msg.ack_offset_list.len();
363 (
364 consumer_group,
365 topic,
366 qid,
367 r_qid,
368 start_offset,
369 akc_offset,
370 pop_time,
371 invisible_time,
372 ack_count,
373 Box::new(batch_ack_msg) as Box<dyn AckMessage + Send>,
374 broker_name.unwrap().clone(),
375 )
376 };
377
378 ack_msg.set_consumer_group(consume_group.clone());
382 ack_msg.set_topic(topic.clone());
383 ack_msg.set_queue_id(qid);
384 ack_msg.set_start_offset(start_offset);
385 ack_msg.set_ack_offset(ack_offset);
386 ack_msg.set_pop_time(pop_time);
387 ack_msg.set_broker_name(broker_name);
388 if self
389 .pop_message_processor
390 .pop_buffer_merge_service_mut()
391 .add_ack(r_qid, ack_msg.as_ref())
392 {
393 self.broker_runtime_inner
394 .pop_inflight_message_counter()
395 .decrement_in_flight_message_num(
396 &topic,
397 &consume_group,
398 pop_time,
399 qid,
400 ack_count as i64,
401 );
402 return Ok(());
403 }
404 let mut inner = MessageExtBrokerInner::default();
405 inner.set_topic(self.revive_topic.clone());
406 inner.message_ext_inner.queue_id = qid;
407 if let Some(batch_ack) = ack_msg.as_any().downcast_ref::<BatchAckMsg>() {
408 inner.set_body(batch_ack.encode()?.into());
409 inner.set_tags(CheetahString::from_static_str(
410 PopAckConstants::BATCH_ACK_TAG,
411 ));
412 inner.put_property(
413 CheetahString::from_static_str(
414 MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
415 ),
416 CheetahString::from(PopMessageProcessor::<MS>::gen_batch_ack_unique_id(
417 batch_ack,
418 )),
419 );
420 } else if let Some(ack_msg) = ack_msg.as_any().downcast_ref::<AckMsg>() {
421 inner.set_body(ack_msg.encode()?.into());
422 inner.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG));
423 inner.put_property(
424 CheetahString::from_static_str(
425 MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
426 ),
427 CheetahString::from(PopMessageProcessor::<MS>::gen_ack_unique_id(
428 ack_msg as &dyn AckMessage,
429 )),
430 );
431 }
432 inner.message_ext_inner.born_timestamp = get_current_millis() as i64;
433 inner.message_ext_inner.store_host = self.broker_runtime_inner.store_host();
434 inner.message_ext_inner.born_host = self.broker_runtime_inner.store_host();
435 inner.set_delay_time_ms((pop_time + invisible_time) as u64);
436 inner.put_property(
437 CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
438 CheetahString::from(PopMessageProcessor::<MS>::gen_ack_unique_id(
439 ack_msg.as_ref(),
440 )),
441 );
442 inner.properties_string =
443 message_decoder::message_properties_to_string(inner.get_properties());
444 let put_message_result = self
445 .broker_runtime_inner
446 .escape_bridge_mut()
447 .put_message_to_specific_queue(inner)
448 .await;
449 if !matches!(
450 put_message_result.put_message_status(),
451 PutMessageStatus::PutOk
452 | PutMessageStatus::FlushDiskTimeout
453 | PutMessageStatus::FlushSlaveTimeout
454 | PutMessageStatus::SlaveNotAvailable
455 ) {
456 error!(
457 "put ack msg error:{:?}",
458 put_message_result.put_message_status()
459 );
460 }
461 self.broker_runtime_inner
462 .pop_inflight_message_counter()
463 .decrement_in_flight_message_num(
464 &topic,
465 &consume_group,
466 pop_time,
467 qid,
468 ack_count as i64,
469 );
470 Ok(())
471 }
472
473 async fn ack_orderly(
474 &mut self,
475 topic: CheetahString,
476 consume_group: CheetahString,
477 q_id: i32,
478 ack_offset: i64,
479 pop_time: i64,
480 invisible_time: i64,
481 channel: &Channel,
482 response: &mut RemotingCommand,
483 ) {
484 let lock_key = CheetahString::from_string(format!(
485 "{}{}{}{}{}",
486 &topic,
487 PopAckConstants::SPLIT,
488 &consume_group,
489 PopAckConstants::SPLIT,
490 q_id
491 ));
492 let old_offset = self
493 .broker_runtime_inner
494 .consumer_offset_manager()
495 .query_offset(&consume_group, &topic, q_id);
496 if old_offset > ack_offset {
497 return;
498 }
499 while !self
500 .pop_message_processor
501 .queue_lock_manager()
502 .try_lock_with_key(lock_key.clone())
503 .await
504 {
505 }
507 let old_offset = self
508 .broker_runtime_inner
509 .consumer_offset_manager()
510 .query_offset(&consume_group, &topic, q_id);
511 if old_offset > ack_offset {
512 return;
513 }
514 let next_offset = self
515 .broker_runtime_inner
516 .consumer_order_info_manager()
517 .commit_and_next(
518 &consume_group,
519 &topic,
520 q_id,
521 ack_offset as u64,
522 pop_time as u64,
523 );
524 match next_offset.cmp(&-1) {
525 Ordering::Less => {}
526 Ordering::Equal => {
527 let error_info = format!(
528 "offset is illegal, key:{}, old:{}, commit:{}, next:{}, {}",
529 lock_key,
530 old_offset,
531 ack_offset,
532 next_offset,
533 channel.remote_address()
534 );
535 response.set_code_ref(ResponseCode::MessageIllegal);
536 response.set_remark_mut(error_info);
537 self.pop_message_processor
538 .queue_lock_manager()
539 .unlock_with_key(lock_key)
540 .await;
541 return;
542 }
543 Ordering::Greater => {
544 if !self
545 .broker_runtime_inner
546 .consumer_offset_manager()
547 .has_offset_reset(consume_group.as_str(), topic.as_str(), q_id)
548 {
549 self.broker_runtime_inner
550 .consumer_offset_manager()
551 .commit_offset(
552 channel.remote_address().to_string().into(),
553 &consume_group,
554 &topic,
555 q_id,
556 next_offset,
557 );
558 }
559
560 if !self
561 .broker_runtime_inner
562 .consumer_order_info_manager()
563 .check_block(
564 &CheetahString::empty(),
565 &consume_group,
566 &topic,
567 q_id,
568 invisible_time as u64,
569 )
570 {
571 self.pop_message_processor.notify_message_arriving(
572 &topic,
573 q_id,
574 &consume_group,
575 );
576 }
577 }
578 }
579 self.pop_message_processor
580 .queue_lock_manager()
581 .unlock_with_key(lock_key)
582 .await;
583 self.broker_runtime_inner
584 .pop_inflight_message_counter()
585 .decrement_in_flight_message_num(&topic, &consume_group, pop_time, q_id, 1);
586 }
587
588 pub fn shutdown(&mut self) {
589 for pop_revive_service in self.pop_revive_services.iter_mut() {
590 pop_revive_service.shutdown();
591 }
592 }
593}