rocketmq_broker/processor/
ack_message_processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::cmp::Ordering;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::key_builder::POP_ORDER_REVIVE_QUEUE;
21use rocketmq_common::common::message::message_decoder;
22use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
23use rocketmq_common::common::message::MessageConst;
24use rocketmq_common::common::message::MessageTrait;
25use rocketmq_common::common::mix_all::MASTER_ID;
26use rocketmq_common::common::pop_ack_constants::PopAckConstants;
27use rocketmq_common::common::FAQUrl;
28use rocketmq_common::TimeUtils::get_current_millis;
29use rocketmq_error::RocketMQResult;
30use rocketmq_remoting::code::request_code::RequestCode;
31use rocketmq_remoting::code::response_code::ResponseCode;
32use rocketmq_remoting::net::channel::Channel;
33use rocketmq_remoting::protocol::body::batch_ack::BatchAck;
34use rocketmq_remoting::protocol::body::batch_ack_message_request_body::BatchAckMessageRequestBody;
35use rocketmq_remoting::protocol::header::ack_message_request_header::AckMessageRequestHeader;
36use rocketmq_remoting::protocol::header::extra_info_util::ExtraInfoUtil;
37use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
38use rocketmq_remoting::protocol::RemotingDeserializable;
39use rocketmq_remoting::protocol::RemotingSerializable;
40use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
41use rocketmq_remoting::runtime::processor::RequestProcessor;
42use rocketmq_rust::ArcMut;
43use rocketmq_store::base::message_status_enum::PutMessageStatus;
44use rocketmq_store::base::message_store::MessageStore;
45use rocketmq_store::pop::ack_msg::AckMsg;
46use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
47use rocketmq_store::pop::AckMessage;
48use tracing::error;
49use tracing::info;
50use tracing::warn;
51
52use crate::broker_runtime::BrokerRuntimeInner;
53use crate::processor::pop_message_processor::PopMessageProcessor;
54use crate::processor::processor_service::pop_revive_service::PopReviveService;
55
56pub struct AckMessageProcessor<MS: MessageStore> {
57    pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
58    broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
59    revive_topic: CheetahString,
60    pop_revive_services: Vec<ArcMut<PopReviveService<MS>>>,
61}
62
63impl<MS> RequestProcessor for AckMessageProcessor<MS>
64where
65    MS: MessageStore,
66{
67    async fn process_request(
68        &mut self,
69        channel: Channel,
70        ctx: ConnectionHandlerContext,
71        request: &mut RemotingCommand,
72    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
73        let request_code = RequestCode::from(request.code());
74        info!(
75            "AckMessageProcessor received request code: {:?}",
76            request_code
77        );
78        match request_code {
79            RequestCode::AckMessage | RequestCode::BatchAckMessage => {
80                self.process_request_inner(channel, ctx, request_code, request)
81                    .await
82            }
83            _ => {
84                warn!(
85                    "AckMessageProcessor received unknown request code: {:?}",
86                    request_code
87                );
88                let response = RemotingCommand::create_response_command_with_code_remark(
89                    ResponseCode::RequestCodeNotSupported,
90                    format!(
91                        "AckMessageProcessor request code {} not supported",
92                        request.code()
93                    ),
94                );
95                Ok(Some(response.set_opaque(request.opaque())))
96            }
97        }
98    }
99}
100
101impl<MS> AckMessageProcessor<MS>
102where
103    MS: MessageStore,
104{
105    pub fn new(
106        broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
107        pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
108    ) -> AckMessageProcessor<MS> {
109        let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic(
110            broker_runtime_inner
111                .broker_config()
112                .broker_identity
113                .broker_cluster_name
114                .as_str(),
115        ));
116        let mut pop_revive_services = vec![];
117        let is_run_pop_revive = broker_runtime_inner
118            .broker_config()
119            .broker_identity
120            .broker_id
121            == MASTER_ID;
122
123        // each PopReviveService handles one revive topic's revive queue
124        for i in 0..broker_runtime_inner.broker_config().revive_queue_num {
125            let mut pop_revive_service =
126                PopReviveService::new(revive_topic.clone(), i as i32, broker_runtime_inner.clone());
127            pop_revive_service.set_should_run_pop_revive(is_run_pop_revive);
128            pop_revive_services.push(ArcMut::new(pop_revive_service));
129        }
130        AckMessageProcessor {
131            pop_message_processor,
132            broker_runtime_inner,
133            revive_topic,
134            pop_revive_services,
135        }
136    }
137
138    pub async fn process_request_inner(
139        &mut self,
140        channel: Channel,
141        ctx: ConnectionHandlerContext,
142        request_code: RequestCode,
143        request: &mut RemotingCommand,
144    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
145        match request_code {
146            RequestCode::AckMessage => self.process_ack(channel, ctx, request, true).await,
147            RequestCode::BatchAckMessage => {
148                self.process_batch_ack(channel, ctx, request, true).await
149            }
150            _ => {
151                error!(
152                    "AckMessageProcessor failed to process RequestCode: {}, consumer: {} ",
153                    request_code.to_i32(),
154                    channel.remote_address()
155                );
156                Ok(Some(
157                    RemotingCommand::create_response_command_with_code_remark(
158                        ResponseCode::MessageIllegal,
159                        format!(
160                            "AckMessageProcessor failed to process RequestCode: {request_code:?}",
161                        ),
162                    ),
163                ))
164            }
165        }
166    }
167
168    pub fn start(&mut self) {
169        for pop_revive_service in self.pop_revive_services.iter() {
170            PopReviveService::start(pop_revive_service.clone());
171        }
172    }
173
174    pub fn set_pop_revive_service_status(&mut self, status: bool) {
175        for pop_revive_service in self.pop_revive_services.iter_mut() {
176            pop_revive_service.set_should_run_pop_revive(status);
177        }
178    }
179}
180
181impl<MS> AckMessageProcessor<MS>
182where
183    MS: MessageStore,
184{
185    async fn process_ack(
186        &mut self,
187        channel: Channel,
188        _ctx: ConnectionHandlerContext,
189        request: &mut RemotingCommand,
190        _broker_allow_suspend: bool,
191    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
192        let request_header = request.decode_command_custom_header::<AckMessageRequestHeader>()?;
193        let topic_config = self
194            .broker_runtime_inner
195            .topic_config_manager()
196            .select_topic_config(&request_header.topic);
197        if topic_config.is_none() {
198            error!(
199                "topic[{}] not exist, consumer: {},apply first please! {}",
200                request_header.topic,
201                channel.remote_address(),
202                FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
203            );
204            return Ok(Some(
205                RemotingCommand::create_response_command_with_code_remark(
206                    ResponseCode::TopicNotExist,
207                    format!(
208                        "topic[{}] not exist, apply first please! {}",
209                        request_header.topic,
210                        FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
211                    ),
212                ),
213            ));
214        }
215        let topic_config = topic_config.unwrap();
216        if request_header.queue_id >= topic_config.read_queue_nums as i32
217            || request_header.queue_id < 0
218        {
219            let error_msg = format!(
220                "queueId{}] is illegal, topic:[{}] topicConfig.readQueueNums:[{}] consumer:[{}]",
221                request_header.queue_id,
222                request_header.topic,
223                topic_config.read_queue_nums,
224                channel.remote_address()
225            );
226            warn!("{}", error_msg);
227
228            return Ok(Some(
229                RemotingCommand::create_response_command_with_code_remark(
230                    ResponseCode::MessageIllegal,
231                    error_msg,
232                ),
233            ));
234        }
235        let message_store_inner = self.broker_runtime_inner.message_store().as_ref().unwrap();
236        let min_offset = message_store_inner
237            .get_min_offset_in_queue(&request_header.topic, request_header.queue_id);
238        let max_offset = message_store_inner
239            .get_max_offset_in_queue(&request_header.topic, request_header.queue_id);
240        if request_header.offset < min_offset || request_header.offset > max_offset {
241            let error_msg = format!(
242                "request offset not in queue offset range, request offset: {}, min offset: {}, \
243                 max offset: {}",
244                request_header.offset, min_offset, max_offset
245            );
246            warn!("{}", error_msg);
247
248            return Ok(Some(
249                RemotingCommand::create_response_command_with_code_remark(
250                    ResponseCode::NoMessage,
251                    error_msg,
252                ),
253            ));
254        }
255        let mut response = RemotingCommand::create_response_command();
256        self.append_ack(Some(request_header), &mut response, None, &channel, None)
257            .await?;
258        Ok(Some(response))
259    }
260
261    async fn process_batch_ack(
262        &mut self,
263        _channel: Channel,
264        _ctx: ConnectionHandlerContext,
265        request: &mut RemotingCommand,
266        _broker_allow_suspend: bool,
267    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
268        if request.get_body().is_none() {
269            return Ok(Some(RemotingCommand::create_response_command_with_code(
270                ResponseCode::NoMessage,
271            )));
272        }
273        let req_body = BatchAckMessageRequestBody::decode(request.get_body().unwrap())?;
274        if req_body.acks.is_empty() {
275            return Ok(Some(RemotingCommand::create_response_command_with_code(
276                ResponseCode::NoMessage,
277            )));
278        }
279        let mut response = RemotingCommand::create_response_command();
280        let broker_name = &req_body.broker_name;
281        for ack in req_body.acks {
282            self.append_ack(None, &mut response, Some(ack), &_channel, Some(broker_name))
283                .await?;
284        }
285        Ok(Some(response))
286    }
287
288    async fn append_ack(
289        &mut self,
290        request_header: Option<AckMessageRequestHeader>,
291        response: &mut RemotingCommand,
292        batch_ack: Option<BatchAck>,
293        channel: &Channel,
294        broker_name: Option<&CheetahString>,
295    ) -> RocketMQResult<()> {
296        //handle single ack
297        let (
298            consume_group,
299            topic,
300            qid,
301            r_qid,
302            start_offset,
303            ack_offset,
304            pop_time,
305            invisible_time,
306            ack_count,
307            mut ack_msg,
308            broker_name,
309        ) = if let Some(request_header) = request_header {
310            let extra_info = ExtraInfoUtil::split(request_header.extra_info.as_str());
311            let broker_name = ExtraInfoUtil::get_broker_name(extra_info.as_slice())?;
312            let consume_group = request_header.consumer_group;
313            let topic = request_header.topic;
314            let qid = request_header.queue_id;
315            let r_qid = ExtraInfoUtil::get_revive_qid(extra_info.as_slice())?;
316            let start_offset = ExtraInfoUtil::get_ck_queue_offset(extra_info.as_slice())?;
317            let ack_offset = request_header.offset;
318            let pop_time = ExtraInfoUtil::get_pop_time(extra_info.as_slice())?;
319            let invisible_time = ExtraInfoUtil::get_invisible_time(extra_info.as_slice())?;
320            if r_qid == POP_ORDER_REVIVE_QUEUE {
321                self.ack_orderly(
322                    topic,
323                    consume_group,
324                    qid,
325                    ack_offset,
326                    pop_time,
327                    invisible_time,
328                    channel,
329                    response,
330                )
331                .await;
332                return Ok(());
333            }
334            let ack = AckMsg::default();
335            let ack_count = 1;
336            (
337                consume_group,
338                topic,
339                qid,
340                r_qid,
341                start_offset,
342                ack_offset,
343                pop_time,
344                invisible_time,
345                ack_count,
346                Box::new(ack) as Box<dyn AckMessage + Send>,
347                CheetahString::from(broker_name),
348            )
349        } else {
350            //handle batch ack
351            let batch_ack = batch_ack.unwrap();
352            let consumer_group = batch_ack.consumer_group;
353            let topic = CheetahString::from(ExtraInfoUtil::get_real_topic_with_retry(
354                batch_ack.topic.as_str(),
355                consumer_group.as_str(),
356                batch_ack.retry.as_str(),
357            )?);
358            let qid = batch_ack.queue_id;
359            let r_qid = batch_ack.revive_queue_id;
360            let start_offset = batch_ack.start_offset;
361            let akc_offset = -1;
362            let pop_time = batch_ack.pop_time;
363            let invisible_time = batch_ack.invisible_time;
364            let message_store = self.broker_runtime_inner.message_store().as_ref().unwrap();
365            let min_offset = message_store.get_min_offset_in_queue(&topic, qid);
366            let max_offset = message_store.get_max_offset_in_queue(&topic, qid);
367            if min_offset == -1 || max_offset == -1 {
368                //error!("Illegal topic or queue found when batch ack {:?}", batch_ack);
369                return Ok(());
370            }
371
372            let mut batch_ack_msg = BatchAckMsg::default();
373
374            let bit_set = &batch_ack.bit_set.0;
375            for i in bit_set.iter_ones() {
376                if i == usize::MAX {
377                    break;
378                }
379                let offset = batch_ack.start_offset + i as i64;
380                if offset < min_offset || offset > max_offset {
381                    continue;
382                }
383                if r_qid == POP_ORDER_REVIVE_QUEUE {
384                    self.ack_orderly(
385                        topic.clone(),
386                        consumer_group.clone(),
387                        qid,
388                        offset,
389                        pop_time,
390                        invisible_time,
391                        channel,
392                        response,
393                    )
394                    .await;
395                } else {
396                    batch_ack_msg.ack_offset_list.push(offset);
397                }
398            }
399            if r_qid == POP_ORDER_REVIVE_QUEUE || batch_ack_msg.ack_offset_list.is_empty() {
400                return Ok(());
401            }
402            let ack_count = batch_ack_msg.ack_offset_list.len();
403            (
404                consumer_group,
405                topic,
406                qid,
407                r_qid,
408                start_offset,
409                akc_offset,
410                pop_time,
411                invisible_time,
412                ack_count,
413                Box::new(batch_ack_msg) as Box<dyn AckMessage + Send>,
414                broker_name.unwrap().clone(),
415            )
416        };
417
418        //this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
419        //this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup,topic,
420        // ackCount);
421        ack_msg.set_consumer_group(consume_group.clone());
422        ack_msg.set_topic(topic.clone());
423        ack_msg.set_queue_id(qid);
424        ack_msg.set_start_offset(start_offset);
425        ack_msg.set_ack_offset(ack_offset);
426        ack_msg.set_pop_time(pop_time);
427        ack_msg.set_broker_name(broker_name);
428        if self
429            .pop_message_processor
430            .pop_buffer_merge_service_mut()
431            .add_ack(r_qid, ack_msg.as_ref())
432        {
433            self.broker_runtime_inner
434                .pop_inflight_message_counter()
435                .decrement_in_flight_message_num(
436                    &topic,
437                    &consume_group,
438                    pop_time,
439                    qid,
440                    ack_count as i64,
441                );
442            return Ok(());
443        }
444        let mut inner = MessageExtBrokerInner::default();
445        inner.set_topic(self.revive_topic.clone());
446        inner.message_ext_inner.queue_id = qid;
447        if let Some(batch_ack) = ack_msg.as_any().downcast_ref::<BatchAckMsg>() {
448            inner.set_body(batch_ack.encode()?.into());
449            inner.set_tags(CheetahString::from_static_str(
450                PopAckConstants::BATCH_ACK_TAG,
451            ));
452            inner.put_property(
453                CheetahString::from_static_str(
454                    MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
455                ),
456                CheetahString::from(PopMessageProcessor::<MS>::gen_batch_ack_unique_id(
457                    batch_ack,
458                )),
459            );
460        } else if let Some(ack_msg) = ack_msg.as_any().downcast_ref::<AckMsg>() {
461            inner.set_body(ack_msg.encode()?.into());
462            inner.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG));
463            inner.put_property(
464                CheetahString::from_static_str(
465                    MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
466                ),
467                CheetahString::from(PopMessageProcessor::<MS>::gen_ack_unique_id(
468                    ack_msg as &dyn AckMessage,
469                )),
470            );
471        }
472        inner.message_ext_inner.born_timestamp = get_current_millis() as i64;
473        inner.message_ext_inner.store_host = self.broker_runtime_inner.store_host();
474        inner.message_ext_inner.born_host = self.broker_runtime_inner.store_host();
475        inner.set_delay_time_ms((pop_time + invisible_time) as u64);
476        inner.put_property(
477            CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
478            CheetahString::from(PopMessageProcessor::<MS>::gen_ack_unique_id(
479                ack_msg.as_ref(),
480            )),
481        );
482        inner.properties_string =
483            message_decoder::message_properties_to_string(inner.get_properties());
484        let put_message_result = self
485            .broker_runtime_inner
486            .escape_bridge_mut()
487            .put_message_to_specific_queue(inner)
488            .await;
489        if !matches!(
490            put_message_result.put_message_status(),
491            PutMessageStatus::PutOk
492                | PutMessageStatus::FlushDiskTimeout
493                | PutMessageStatus::FlushSlaveTimeout
494                | PutMessageStatus::SlaveNotAvailable
495        ) {
496            error!(
497                "put ack msg error:{:?}",
498                put_message_result.put_message_status()
499            );
500        }
501        self.broker_runtime_inner
502            .pop_inflight_message_counter()
503            .decrement_in_flight_message_num(
504                &topic,
505                &consume_group,
506                pop_time,
507                qid,
508                ack_count as i64,
509            );
510        Ok(())
511    }
512
513    async fn ack_orderly(
514        &mut self,
515        topic: CheetahString,
516        consume_group: CheetahString,
517        q_id: i32,
518        ack_offset: i64,
519        pop_time: i64,
520        invisible_time: i64,
521        channel: &Channel,
522        response: &mut RemotingCommand,
523    ) {
524        let lock_key = CheetahString::from_string(format!(
525            "{}{}{}{}{}",
526            &topic,
527            PopAckConstants::SPLIT,
528            &consume_group,
529            PopAckConstants::SPLIT,
530            q_id
531        ));
532        let old_offset = self
533            .broker_runtime_inner
534            .consumer_offset_manager()
535            .query_offset(&consume_group, &topic, q_id);
536        if old_offset > ack_offset {
537            return;
538        }
539        while !self
540            .pop_message_processor
541            .queue_lock_manager()
542            .try_lock_with_key(lock_key.clone())
543            .await
544        {
545            //nothing to do
546        }
547        let old_offset = self
548            .broker_runtime_inner
549            .consumer_offset_manager()
550            .query_offset(&consume_group, &topic, q_id);
551        if old_offset > ack_offset {
552            return;
553        }
554        let next_offset = self
555            .broker_runtime_inner
556            .consumer_order_info_manager()
557            .commit_and_next(
558                &consume_group,
559                &topic,
560                q_id,
561                ack_offset as u64,
562                pop_time as u64,
563            );
564        match next_offset.cmp(&-1) {
565            Ordering::Less => {}
566            Ordering::Equal => {
567                let error_info = format!(
568                    "offset is illegal, key:{}, old:{}, commit:{}, next:{}, {}",
569                    lock_key,
570                    old_offset,
571                    ack_offset,
572                    next_offset,
573                    channel.remote_address()
574                );
575                response.set_code_ref(ResponseCode::MessageIllegal);
576                response.set_remark_mut(error_info);
577                self.pop_message_processor
578                    .queue_lock_manager()
579                    .unlock_with_key(lock_key)
580                    .await;
581                return;
582            }
583            Ordering::Greater => {
584                if !self
585                    .broker_runtime_inner
586                    .consumer_offset_manager()
587                    .has_offset_reset(consume_group.as_str(), topic.as_str(), q_id)
588                {
589                    self.broker_runtime_inner
590                        .consumer_offset_manager()
591                        .commit_offset(
592                            channel.remote_address().to_string().into(),
593                            &consume_group,
594                            &topic,
595                            q_id,
596                            next_offset,
597                        );
598                }
599
600                if !self
601                    .broker_runtime_inner
602                    .consumer_order_info_manager()
603                    .check_block(
604                        &CheetahString::empty(),
605                        &consume_group,
606                        &topic,
607                        q_id,
608                        invisible_time as u64,
609                    )
610                {
611                    self.pop_message_processor.notify_message_arriving(
612                        &topic,
613                        q_id,
614                        &consume_group,
615                    );
616                }
617            }
618        }
619        self.pop_message_processor
620            .queue_lock_manager()
621            .unlock_with_key(lock_key)
622            .await;
623        self.broker_runtime_inner
624            .pop_inflight_message_counter()
625            .decrement_in_flight_message_num(&topic, &consume_group, pop_time, q_id, 1);
626    }
627
628    pub fn shutdown(&mut self) {
629        for pop_revive_service in self.pop_revive_services.iter_mut() {
630            pop_revive_service.shutdown();
631        }
632    }
633}