1use std::cmp::Ordering;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::key_builder::POP_ORDER_REVIVE_QUEUE;
21use rocketmq_common::common::message::message_decoder;
22use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
23use rocketmq_common::common::message::MessageConst;
24use rocketmq_common::common::message::MessageTrait;
25use rocketmq_common::common::mix_all::MASTER_ID;
26use rocketmq_common::common::pop_ack_constants::PopAckConstants;
27use rocketmq_common::common::FAQUrl;
28use rocketmq_common::TimeUtils::get_current_millis;
29use rocketmq_error::RocketMQResult;
30use rocketmq_remoting::code::request_code::RequestCode;
31use rocketmq_remoting::code::response_code::ResponseCode;
32use rocketmq_remoting::net::channel::Channel;
33use rocketmq_remoting::protocol::body::batch_ack::BatchAck;
34use rocketmq_remoting::protocol::body::batch_ack_message_request_body::BatchAckMessageRequestBody;
35use rocketmq_remoting::protocol::header::ack_message_request_header::AckMessageRequestHeader;
36use rocketmq_remoting::protocol::header::extra_info_util::ExtraInfoUtil;
37use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
38use rocketmq_remoting::protocol::RemotingDeserializable;
39use rocketmq_remoting::protocol::RemotingSerializable;
40use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
41use rocketmq_remoting::runtime::processor::RequestProcessor;
42use rocketmq_rust::ArcMut;
43use rocketmq_store::base::message_status_enum::PutMessageStatus;
44use rocketmq_store::base::message_store::MessageStore;
45use rocketmq_store::pop::ack_msg::AckMsg;
46use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
47use rocketmq_store::pop::AckMessage;
48use tracing::error;
49use tracing::info;
50use tracing::warn;
51
52use crate::broker_runtime::BrokerRuntimeInner;
53use crate::processor::pop_message_processor::PopMessageProcessor;
54use crate::processor::processor_service::pop_revive_service::PopReviveService;
55
56pub struct AckMessageProcessor<MS: MessageStore> {
57 pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
58 broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
59 revive_topic: CheetahString,
60 pop_revive_services: Vec<ArcMut<PopReviveService<MS>>>,
61}
62
63impl<MS> RequestProcessor for AckMessageProcessor<MS>
64where
65 MS: MessageStore,
66{
67 async fn process_request(
68 &mut self,
69 channel: Channel,
70 ctx: ConnectionHandlerContext,
71 request: &mut RemotingCommand,
72 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
73 let request_code = RequestCode::from(request.code());
74 info!(
75 "AckMessageProcessor received request code: {:?}",
76 request_code
77 );
78 match request_code {
79 RequestCode::AckMessage | RequestCode::BatchAckMessage => {
80 self.process_request_inner(channel, ctx, request_code, request)
81 .await
82 }
83 _ => {
84 warn!(
85 "AckMessageProcessor received unknown request code: {:?}",
86 request_code
87 );
88 let response = RemotingCommand::create_response_command_with_code_remark(
89 ResponseCode::RequestCodeNotSupported,
90 format!(
91 "AckMessageProcessor request code {} not supported",
92 request.code()
93 ),
94 );
95 Ok(Some(response.set_opaque(request.opaque())))
96 }
97 }
98 }
99}
100
101impl<MS> AckMessageProcessor<MS>
102where
103 MS: MessageStore,
104{
105 pub fn new(
106 broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
107 pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
108 ) -> AckMessageProcessor<MS> {
109 let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic(
110 broker_runtime_inner
111 .broker_config()
112 .broker_identity
113 .broker_cluster_name
114 .as_str(),
115 ));
116 let mut pop_revive_services = vec![];
117 let is_run_pop_revive = broker_runtime_inner
118 .broker_config()
119 .broker_identity
120 .broker_id
121 == MASTER_ID;
122
123 for i in 0..broker_runtime_inner.broker_config().revive_queue_num {
125 let mut pop_revive_service =
126 PopReviveService::new(revive_topic.clone(), i as i32, broker_runtime_inner.clone());
127 pop_revive_service.set_should_run_pop_revive(is_run_pop_revive);
128 pop_revive_services.push(ArcMut::new(pop_revive_service));
129 }
130 AckMessageProcessor {
131 pop_message_processor,
132 broker_runtime_inner,
133 revive_topic,
134 pop_revive_services,
135 }
136 }
137
138 pub async fn process_request_inner(
139 &mut self,
140 channel: Channel,
141 ctx: ConnectionHandlerContext,
142 request_code: RequestCode,
143 request: &mut RemotingCommand,
144 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
145 match request_code {
146 RequestCode::AckMessage => self.process_ack(channel, ctx, request, true).await,
147 RequestCode::BatchAckMessage => {
148 self.process_batch_ack(channel, ctx, request, true).await
149 }
150 _ => {
151 error!(
152 "AckMessageProcessor failed to process RequestCode: {}, consumer: {} ",
153 request_code.to_i32(),
154 channel.remote_address()
155 );
156 Ok(Some(
157 RemotingCommand::create_response_command_with_code_remark(
158 ResponseCode::MessageIllegal,
159 format!(
160 "AckMessageProcessor failed to process RequestCode: {request_code:?}",
161 ),
162 ),
163 ))
164 }
165 }
166 }
167
168 pub fn start(&mut self) {
169 for pop_revive_service in self.pop_revive_services.iter() {
170 PopReviveService::start(pop_revive_service.clone());
171 }
172 }
173
174 pub fn set_pop_revive_service_status(&mut self, status: bool) {
175 for pop_revive_service in self.pop_revive_services.iter_mut() {
176 pop_revive_service.set_should_run_pop_revive(status);
177 }
178 }
179}
180
181impl<MS> AckMessageProcessor<MS>
182where
183 MS: MessageStore,
184{
185 async fn process_ack(
186 &mut self,
187 channel: Channel,
188 _ctx: ConnectionHandlerContext,
189 request: &mut RemotingCommand,
190 _broker_allow_suspend: bool,
191 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
192 let request_header = request.decode_command_custom_header::<AckMessageRequestHeader>()?;
193 let topic_config = self
194 .broker_runtime_inner
195 .topic_config_manager()
196 .select_topic_config(&request_header.topic);
197 if topic_config.is_none() {
198 error!(
199 "topic[{}] not exist, consumer: {},apply first please! {}",
200 request_header.topic,
201 channel.remote_address(),
202 FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
203 );
204 return Ok(Some(
205 RemotingCommand::create_response_command_with_code_remark(
206 ResponseCode::TopicNotExist,
207 format!(
208 "topic[{}] not exist, apply first please! {}",
209 request_header.topic,
210 FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
211 ),
212 ),
213 ));
214 }
215 let topic_config = topic_config.unwrap();
216 if request_header.queue_id >= topic_config.read_queue_nums as i32
217 || request_header.queue_id < 0
218 {
219 let error_msg = format!(
220 "queueId{}] is illegal, topic:[{}] topicConfig.readQueueNums:[{}] consumer:[{}]",
221 request_header.queue_id,
222 request_header.topic,
223 topic_config.read_queue_nums,
224 channel.remote_address()
225 );
226 warn!("{}", error_msg);
227
228 return Ok(Some(
229 RemotingCommand::create_response_command_with_code_remark(
230 ResponseCode::MessageIllegal,
231 error_msg,
232 ),
233 ));
234 }
235 let message_store_inner = self.broker_runtime_inner.message_store().as_ref().unwrap();
236 let min_offset = message_store_inner
237 .get_min_offset_in_queue(&request_header.topic, request_header.queue_id);
238 let max_offset = message_store_inner
239 .get_max_offset_in_queue(&request_header.topic, request_header.queue_id);
240 if request_header.offset < min_offset || request_header.offset > max_offset {
241 let error_msg = format!(
242 "request offset not in queue offset range, request offset: {}, min offset: {}, \
243 max offset: {}",
244 request_header.offset, min_offset, max_offset
245 );
246 warn!("{}", error_msg);
247
248 return Ok(Some(
249 RemotingCommand::create_response_command_with_code_remark(
250 ResponseCode::NoMessage,
251 error_msg,
252 ),
253 ));
254 }
255 let mut response = RemotingCommand::create_response_command();
256 self.append_ack(Some(request_header), &mut response, None, &channel, None)
257 .await?;
258 Ok(Some(response))
259 }
260
261 async fn process_batch_ack(
262 &mut self,
263 _channel: Channel,
264 _ctx: ConnectionHandlerContext,
265 request: &mut RemotingCommand,
266 _broker_allow_suspend: bool,
267 ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
268 if request.get_body().is_none() {
269 return Ok(Some(RemotingCommand::create_response_command_with_code(
270 ResponseCode::NoMessage,
271 )));
272 }
273 let req_body = BatchAckMessageRequestBody::decode(request.get_body().unwrap())?;
274 if req_body.acks.is_empty() {
275 return Ok(Some(RemotingCommand::create_response_command_with_code(
276 ResponseCode::NoMessage,
277 )));
278 }
279 let mut response = RemotingCommand::create_response_command();
280 let broker_name = &req_body.broker_name;
281 for ack in req_body.acks {
282 self.append_ack(None, &mut response, Some(ack), &_channel, Some(broker_name))
283 .await?;
284 }
285 Ok(Some(response))
286 }
287
288 async fn append_ack(
289 &mut self,
290 request_header: Option<AckMessageRequestHeader>,
291 response: &mut RemotingCommand,
292 batch_ack: Option<BatchAck>,
293 channel: &Channel,
294 broker_name: Option<&CheetahString>,
295 ) -> RocketMQResult<()> {
296 let (
298 consume_group,
299 topic,
300 qid,
301 r_qid,
302 start_offset,
303 ack_offset,
304 pop_time,
305 invisible_time,
306 ack_count,
307 mut ack_msg,
308 broker_name,
309 ) = if let Some(request_header) = request_header {
310 let extra_info = ExtraInfoUtil::split(request_header.extra_info.as_str());
311 let broker_name = ExtraInfoUtil::get_broker_name(extra_info.as_slice())?;
312 let consume_group = request_header.consumer_group;
313 let topic = request_header.topic;
314 let qid = request_header.queue_id;
315 let r_qid = ExtraInfoUtil::get_revive_qid(extra_info.as_slice())?;
316 let start_offset = ExtraInfoUtil::get_ck_queue_offset(extra_info.as_slice())?;
317 let ack_offset = request_header.offset;
318 let pop_time = ExtraInfoUtil::get_pop_time(extra_info.as_slice())?;
319 let invisible_time = ExtraInfoUtil::get_invisible_time(extra_info.as_slice())?;
320 if r_qid == POP_ORDER_REVIVE_QUEUE {
321 self.ack_orderly(
322 topic,
323 consume_group,
324 qid,
325 ack_offset,
326 pop_time,
327 invisible_time,
328 channel,
329 response,
330 )
331 .await;
332 return Ok(());
333 }
334 let ack = AckMsg::default();
335 let ack_count = 1;
336 (
337 consume_group,
338 topic,
339 qid,
340 r_qid,
341 start_offset,
342 ack_offset,
343 pop_time,
344 invisible_time,
345 ack_count,
346 Box::new(ack) as Box<dyn AckMessage + Send>,
347 CheetahString::from(broker_name),
348 )
349 } else {
350 let batch_ack = batch_ack.unwrap();
352 let consumer_group = batch_ack.consumer_group;
353 let topic = CheetahString::from(ExtraInfoUtil::get_real_topic_with_retry(
354 batch_ack.topic.as_str(),
355 consumer_group.as_str(),
356 batch_ack.retry.as_str(),
357 )?);
358 let qid = batch_ack.queue_id;
359 let r_qid = batch_ack.revive_queue_id;
360 let start_offset = batch_ack.start_offset;
361 let akc_offset = -1;
362 let pop_time = batch_ack.pop_time;
363 let invisible_time = batch_ack.invisible_time;
364 let message_store = self.broker_runtime_inner.message_store().as_ref().unwrap();
365 let min_offset = message_store.get_min_offset_in_queue(&topic, qid);
366 let max_offset = message_store.get_max_offset_in_queue(&topic, qid);
367 if min_offset == -1 || max_offset == -1 {
368 return Ok(());
370 }
371
372 let mut batch_ack_msg = BatchAckMsg::default();
373
374 let bit_set = &batch_ack.bit_set.0;
375 for i in bit_set.iter_ones() {
376 if i == usize::MAX {
377 break;
378 }
379 let offset = batch_ack.start_offset + i as i64;
380 if offset < min_offset || offset > max_offset {
381 continue;
382 }
383 if r_qid == POP_ORDER_REVIVE_QUEUE {
384 self.ack_orderly(
385 topic.clone(),
386 consumer_group.clone(),
387 qid,
388 offset,
389 pop_time,
390 invisible_time,
391 channel,
392 response,
393 )
394 .await;
395 } else {
396 batch_ack_msg.ack_offset_list.push(offset);
397 }
398 }
399 if r_qid == POP_ORDER_REVIVE_QUEUE || batch_ack_msg.ack_offset_list.is_empty() {
400 return Ok(());
401 }
402 let ack_count = batch_ack_msg.ack_offset_list.len();
403 (
404 consumer_group,
405 topic,
406 qid,
407 r_qid,
408 start_offset,
409 akc_offset,
410 pop_time,
411 invisible_time,
412 ack_count,
413 Box::new(batch_ack_msg) as Box<dyn AckMessage + Send>,
414 broker_name.unwrap().clone(),
415 )
416 };
417
418 ack_msg.set_consumer_group(consume_group.clone());
422 ack_msg.set_topic(topic.clone());
423 ack_msg.set_queue_id(qid);
424 ack_msg.set_start_offset(start_offset);
425 ack_msg.set_ack_offset(ack_offset);
426 ack_msg.set_pop_time(pop_time);
427 ack_msg.set_broker_name(broker_name);
428 if self
429 .pop_message_processor
430 .pop_buffer_merge_service_mut()
431 .add_ack(r_qid, ack_msg.as_ref())
432 {
433 self.broker_runtime_inner
434 .pop_inflight_message_counter()
435 .decrement_in_flight_message_num(
436 &topic,
437 &consume_group,
438 pop_time,
439 qid,
440 ack_count as i64,
441 );
442 return Ok(());
443 }
444 let mut inner = MessageExtBrokerInner::default();
445 inner.set_topic(self.revive_topic.clone());
446 inner.message_ext_inner.queue_id = qid;
447 if let Some(batch_ack) = ack_msg.as_any().downcast_ref::<BatchAckMsg>() {
448 inner.set_body(batch_ack.encode()?.into());
449 inner.set_tags(CheetahString::from_static_str(
450 PopAckConstants::BATCH_ACK_TAG,
451 ));
452 inner.put_property(
453 CheetahString::from_static_str(
454 MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
455 ),
456 CheetahString::from(PopMessageProcessor::<MS>::gen_batch_ack_unique_id(
457 batch_ack,
458 )),
459 );
460 } else if let Some(ack_msg) = ack_msg.as_any().downcast_ref::<AckMsg>() {
461 inner.set_body(ack_msg.encode()?.into());
462 inner.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG));
463 inner.put_property(
464 CheetahString::from_static_str(
465 MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
466 ),
467 CheetahString::from(PopMessageProcessor::<MS>::gen_ack_unique_id(
468 ack_msg as &dyn AckMessage,
469 )),
470 );
471 }
472 inner.message_ext_inner.born_timestamp = get_current_millis() as i64;
473 inner.message_ext_inner.store_host = self.broker_runtime_inner.store_host();
474 inner.message_ext_inner.born_host = self.broker_runtime_inner.store_host();
475 inner.set_delay_time_ms((pop_time + invisible_time) as u64);
476 inner.put_property(
477 CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
478 CheetahString::from(PopMessageProcessor::<MS>::gen_ack_unique_id(
479 ack_msg.as_ref(),
480 )),
481 );
482 inner.properties_string =
483 message_decoder::message_properties_to_string(inner.get_properties());
484 let put_message_result = self
485 .broker_runtime_inner
486 .escape_bridge_mut()
487 .put_message_to_specific_queue(inner)
488 .await;
489 if !matches!(
490 put_message_result.put_message_status(),
491 PutMessageStatus::PutOk
492 | PutMessageStatus::FlushDiskTimeout
493 | PutMessageStatus::FlushSlaveTimeout
494 | PutMessageStatus::SlaveNotAvailable
495 ) {
496 error!(
497 "put ack msg error:{:?}",
498 put_message_result.put_message_status()
499 );
500 }
501 self.broker_runtime_inner
502 .pop_inflight_message_counter()
503 .decrement_in_flight_message_num(
504 &topic,
505 &consume_group,
506 pop_time,
507 qid,
508 ack_count as i64,
509 );
510 Ok(())
511 }
512
513 async fn ack_orderly(
514 &mut self,
515 topic: CheetahString,
516 consume_group: CheetahString,
517 q_id: i32,
518 ack_offset: i64,
519 pop_time: i64,
520 invisible_time: i64,
521 channel: &Channel,
522 response: &mut RemotingCommand,
523 ) {
524 let lock_key = CheetahString::from_string(format!(
525 "{}{}{}{}{}",
526 &topic,
527 PopAckConstants::SPLIT,
528 &consume_group,
529 PopAckConstants::SPLIT,
530 q_id
531 ));
532 let old_offset = self
533 .broker_runtime_inner
534 .consumer_offset_manager()
535 .query_offset(&consume_group, &topic, q_id);
536 if old_offset > ack_offset {
537 return;
538 }
539 while !self
540 .pop_message_processor
541 .queue_lock_manager()
542 .try_lock_with_key(lock_key.clone())
543 .await
544 {
545 }
547 let old_offset = self
548 .broker_runtime_inner
549 .consumer_offset_manager()
550 .query_offset(&consume_group, &topic, q_id);
551 if old_offset > ack_offset {
552 return;
553 }
554 let next_offset = self
555 .broker_runtime_inner
556 .consumer_order_info_manager()
557 .commit_and_next(
558 &consume_group,
559 &topic,
560 q_id,
561 ack_offset as u64,
562 pop_time as u64,
563 );
564 match next_offset.cmp(&-1) {
565 Ordering::Less => {}
566 Ordering::Equal => {
567 let error_info = format!(
568 "offset is illegal, key:{}, old:{}, commit:{}, next:{}, {}",
569 lock_key,
570 old_offset,
571 ack_offset,
572 next_offset,
573 channel.remote_address()
574 );
575 response.set_code_ref(ResponseCode::MessageIllegal);
576 response.set_remark_mut(error_info);
577 self.pop_message_processor
578 .queue_lock_manager()
579 .unlock_with_key(lock_key)
580 .await;
581 return;
582 }
583 Ordering::Greater => {
584 if !self
585 .broker_runtime_inner
586 .consumer_offset_manager()
587 .has_offset_reset(consume_group.as_str(), topic.as_str(), q_id)
588 {
589 self.broker_runtime_inner
590 .consumer_offset_manager()
591 .commit_offset(
592 channel.remote_address().to_string().into(),
593 &consume_group,
594 &topic,
595 q_id,
596 next_offset,
597 );
598 }
599
600 if !self
601 .broker_runtime_inner
602 .consumer_order_info_manager()
603 .check_block(
604 &CheetahString::empty(),
605 &consume_group,
606 &topic,
607 q_id,
608 invisible_time as u64,
609 )
610 {
611 self.pop_message_processor.notify_message_arriving(
612 &topic,
613 q_id,
614 &consume_group,
615 );
616 }
617 }
618 }
619 self.pop_message_processor
620 .queue_lock_manager()
621 .unlock_with_key(lock_key)
622 .await;
623 self.broker_runtime_inner
624 .pop_inflight_message_counter()
625 .decrement_in_flight_message_num(&topic, &consume_group, pop_time, q_id, 1);
626 }
627
628 pub fn shutdown(&mut self) {
629 for pop_revive_service in self.pop_revive_services.iter_mut() {
630 pop_revive_service.shutdown();
631 }
632 }
633}