rocketmq_broker/processor/
ack_message_processor.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::cmp::Ordering;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::common::key_builder::POP_ORDER_REVIVE_QUEUE;
21use rocketmq_common::common::message::message_decoder;
22use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
23use rocketmq_common::common::message::MessageConst;
24use rocketmq_common::common::message::MessageTrait;
25use rocketmq_common::common::mix_all::MASTER_ID;
26use rocketmq_common::common::pop_ack_constants::PopAckConstants;
27use rocketmq_common::common::FAQUrl;
28use rocketmq_common::TimeUtils::get_current_millis;
29use rocketmq_error::RocketMQResult;
30use rocketmq_remoting::code::request_code::RequestCode;
31use rocketmq_remoting::code::response_code::ResponseCode;
32use rocketmq_remoting::net::channel::Channel;
33use rocketmq_remoting::protocol::body::batch_ack::BatchAck;
34use rocketmq_remoting::protocol::body::batch_ack_message_request_body::BatchAckMessageRequestBody;
35use rocketmq_remoting::protocol::header::ack_message_request_header::AckMessageRequestHeader;
36use rocketmq_remoting::protocol::header::extra_info_util::ExtraInfoUtil;
37use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
38use rocketmq_remoting::protocol::RemotingDeserializable;
39use rocketmq_remoting::protocol::RemotingSerializable;
40use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
41use rocketmq_rust::ArcMut;
42use rocketmq_store::base::message_status_enum::PutMessageStatus;
43use rocketmq_store::base::message_store::MessageStore;
44use rocketmq_store::pop::ack_msg::AckMsg;
45use rocketmq_store::pop::batch_ack_msg::BatchAckMsg;
46use rocketmq_store::pop::AckMessage;
47use tracing::error;
48use tracing::warn;
49
50use crate::broker_runtime::BrokerRuntimeInner;
51use crate::processor::pop_message_processor::PopMessageProcessor;
52use crate::processor::processor_service::pop_revive_service::PopReviveService;
53
54pub struct AckMessageProcessor<MS: MessageStore> {
55    pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
56    broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
57    revive_topic: CheetahString,
58    pop_revive_services: Vec<ArcMut<PopReviveService<MS>>>,
59}
60
61impl<MS> AckMessageProcessor<MS>
62where
63    MS: MessageStore,
64{
65    pub fn new(
66        broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
67        pop_message_processor: ArcMut<PopMessageProcessor<MS>>,
68    ) -> AckMessageProcessor<MS> {
69        let revive_topic = CheetahString::from_string(PopAckConstants::build_cluster_revive_topic(
70            broker_runtime_inner
71                .broker_config()
72                .broker_identity
73                .broker_cluster_name
74                .as_str(),
75        ));
76        let mut pop_revive_services = vec![];
77        let is_run_pop_revive = broker_runtime_inner
78            .broker_config()
79            .broker_identity
80            .broker_id
81            == MASTER_ID;
82
83        // each PopReviveService handles one revive topic's revive queue
84        for i in 0..broker_runtime_inner.broker_config().revive_queue_num {
85            let mut pop_revive_service =
86                PopReviveService::new(revive_topic.clone(), i as i32, broker_runtime_inner.clone());
87            pop_revive_service.set_should_run_pop_revive(is_run_pop_revive);
88            pop_revive_services.push(ArcMut::new(pop_revive_service));
89        }
90        AckMessageProcessor {
91            pop_message_processor,
92            broker_runtime_inner,
93            revive_topic,
94            pop_revive_services,
95        }
96    }
97
98    pub async fn process_request(
99        &mut self,
100        channel: Channel,
101        ctx: ConnectionHandlerContext,
102        request_code: RequestCode,
103        request: RemotingCommand,
104    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
105        match request_code {
106            RequestCode::AckMessage => self.process_ack(channel, ctx, request, true).await,
107            RequestCode::BatchAckMessage => {
108                self.process_batch_ack(channel, ctx, request, true).await
109            }
110            _ => {
111                error!(
112                    "AckMessageProcessor failed to process RequestCode: {}, consumer: {} ",
113                    request_code.to_i32(),
114                    channel.remote_address()
115                );
116                Ok(Some(
117                    RemotingCommand::create_response_command_with_code_remark(
118                        ResponseCode::MessageIllegal,
119                        format!(
120                            "AckMessageProcessor failed to process RequestCode: {request_code:?}",
121                        ),
122                    ),
123                ))
124            }
125        }
126    }
127
128    pub fn start(&mut self) {
129        for pop_revive_service in self.pop_revive_services.iter() {
130            PopReviveService::start(pop_revive_service.clone());
131        }
132    }
133
134    pub fn set_pop_revive_service_status(&mut self, status: bool) {
135        for pop_revive_service in self.pop_revive_services.iter_mut() {
136            pop_revive_service.set_should_run_pop_revive(status);
137        }
138    }
139}
140
141impl<MS> AckMessageProcessor<MS>
142where
143    MS: MessageStore,
144{
145    async fn process_ack(
146        &mut self,
147        channel: Channel,
148        _ctx: ConnectionHandlerContext,
149        request: RemotingCommand,
150        _broker_allow_suspend: bool,
151    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
152        let request_header = request.decode_command_custom_header::<AckMessageRequestHeader>()?;
153        let topic_config = self
154            .broker_runtime_inner
155            .topic_config_manager()
156            .select_topic_config(&request_header.topic);
157        if topic_config.is_none() {
158            error!(
159                "topic[{}] not exist, consumer: {},apply first please! {}",
160                request_header.topic,
161                channel.remote_address(),
162                FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
163            );
164            return Ok(Some(
165                RemotingCommand::create_response_command_with_code_remark(
166                    ResponseCode::TopicNotExist,
167                    format!(
168                        "topic[{}] not exist, apply first please! {}",
169                        request_header.topic,
170                        FAQUrl::suggest_todo(FAQUrl::APPLY_TOPIC_URL)
171                    ),
172                ),
173            ));
174        }
175        let topic_config = topic_config.unwrap();
176        if request_header.queue_id >= topic_config.read_queue_nums as i32
177            || request_header.queue_id < 0
178        {
179            let error_msg = format!(
180                "queueId{}] is illegal, topic:[{}] topicConfig.readQueueNums:[{}] consumer:[{}]",
181                request_header.queue_id,
182                request_header.topic,
183                topic_config.read_queue_nums,
184                channel.remote_address()
185            );
186            warn!("{}", error_msg);
187
188            return Ok(Some(
189                RemotingCommand::create_response_command_with_code_remark(
190                    ResponseCode::MessageIllegal,
191                    error_msg,
192                ),
193            ));
194        }
195        let message_store_inner = self.broker_runtime_inner.message_store().as_ref().unwrap();
196        let min_offset = message_store_inner
197            .get_min_offset_in_queue(&request_header.topic, request_header.queue_id);
198        let max_offset = message_store_inner
199            .get_max_offset_in_queue(&request_header.topic, request_header.queue_id);
200        if request_header.offset < min_offset || request_header.offset > max_offset {
201            let error_msg = format!(
202                "request offset not in queue offset range, request offset: {}, min offset: {}, \
203                 max offset: {}",
204                request_header.offset, min_offset, max_offset
205            );
206            warn!("{}", error_msg);
207
208            return Ok(Some(
209                RemotingCommand::create_response_command_with_code_remark(
210                    ResponseCode::NoMessage,
211                    error_msg,
212                ),
213            ));
214        }
215        let mut response = RemotingCommand::create_response_command();
216        self.append_ack(Some(request_header), &mut response, None, &channel, None)
217            .await?;
218        Ok(Some(response))
219    }
220
221    async fn process_batch_ack(
222        &mut self,
223        _channel: Channel,
224        _ctx: ConnectionHandlerContext,
225        request: RemotingCommand,
226        _broker_allow_suspend: bool,
227    ) -> rocketmq_error::RocketMQResult<Option<RemotingCommand>> {
228        if request.get_body().is_none() {
229            return Ok(Some(RemotingCommand::create_response_command_with_code(
230                ResponseCode::NoMessage,
231            )));
232        }
233        let req_body = BatchAckMessageRequestBody::decode(request.get_body().unwrap())?;
234        if req_body.acks.is_empty() {
235            return Ok(Some(RemotingCommand::create_response_command_with_code(
236                ResponseCode::NoMessage,
237            )));
238        }
239        let mut response = RemotingCommand::create_response_command();
240        let broker_name = &req_body.broker_name;
241        for ack in req_body.acks {
242            self.append_ack(None, &mut response, Some(ack), &_channel, Some(broker_name))
243                .await?;
244        }
245        Ok(Some(response))
246    }
247
248    async fn append_ack(
249        &mut self,
250        request_header: Option<AckMessageRequestHeader>,
251        response: &mut RemotingCommand,
252        batch_ack: Option<BatchAck>,
253        channel: &Channel,
254        broker_name: Option<&CheetahString>,
255    ) -> RocketMQResult<()> {
256        //handle single ack
257        let (
258            consume_group,
259            topic,
260            qid,
261            r_qid,
262            start_offset,
263            ack_offset,
264            pop_time,
265            invisible_time,
266            ack_count,
267            mut ack_msg,
268            broker_name,
269        ) = if let Some(request_header) = request_header {
270            let extra_info = ExtraInfoUtil::split(request_header.extra_info.as_str());
271            let broker_name = ExtraInfoUtil::get_broker_name(extra_info.as_slice())?;
272            let consume_group = request_header.consumer_group;
273            let topic = request_header.topic;
274            let qid = request_header.queue_id;
275            let r_qid = ExtraInfoUtil::get_revive_qid(extra_info.as_slice())?;
276            let start_offset = ExtraInfoUtil::get_ck_queue_offset(extra_info.as_slice())?;
277            let ack_offset = request_header.offset;
278            let pop_time = ExtraInfoUtil::get_pop_time(extra_info.as_slice())?;
279            let invisible_time = ExtraInfoUtil::get_invisible_time(extra_info.as_slice())?;
280            if r_qid == POP_ORDER_REVIVE_QUEUE {
281                self.ack_orderly(
282                    topic,
283                    consume_group,
284                    qid,
285                    ack_offset,
286                    pop_time,
287                    invisible_time,
288                    channel,
289                    response,
290                )
291                .await;
292                return Ok(());
293            }
294            let ack = AckMsg::default();
295            let ack_count = 1;
296            (
297                consume_group,
298                topic,
299                qid,
300                r_qid,
301                start_offset,
302                ack_offset,
303                pop_time,
304                invisible_time,
305                ack_count,
306                Box::new(ack) as Box<dyn AckMessage + Send>,
307                CheetahString::from(broker_name),
308            )
309        } else {
310            //handle batch ack
311            let batch_ack = batch_ack.unwrap();
312            let consumer_group = batch_ack.consumer_group;
313            let topic = CheetahString::from(ExtraInfoUtil::get_real_topic_with_retry(
314                batch_ack.topic.as_str(),
315                consumer_group.as_str(),
316                batch_ack.retry.as_str(),
317            )?);
318            let qid = batch_ack.queue_id;
319            let r_qid = batch_ack.revive_queue_id;
320            let start_offset = batch_ack.start_offset;
321            let akc_offset = -1;
322            let pop_time = batch_ack.pop_time;
323            let invisible_time = batch_ack.invisible_time;
324            let message_store = self.broker_runtime_inner.message_store().as_ref().unwrap();
325            let min_offset = message_store.get_min_offset_in_queue(&topic, qid);
326            let max_offset = message_store.get_max_offset_in_queue(&topic, qid);
327            if min_offset == -1 || max_offset == -1 {
328                //error!("Illegal topic or queue found when batch ack {:?}", batch_ack);
329                return Ok(());
330            }
331
332            let mut batch_ack_msg = BatchAckMsg::default();
333
334            let bit_set = &batch_ack.bit_set.0;
335            for i in bit_set.iter_ones() {
336                if i == usize::MAX {
337                    break;
338                }
339                let offset = batch_ack.start_offset + i as i64;
340                if offset < min_offset || offset > max_offset {
341                    continue;
342                }
343                if r_qid == POP_ORDER_REVIVE_QUEUE {
344                    self.ack_orderly(
345                        topic.clone(),
346                        consumer_group.clone(),
347                        qid,
348                        offset,
349                        pop_time,
350                        invisible_time,
351                        channel,
352                        response,
353                    )
354                    .await;
355                } else {
356                    batch_ack_msg.ack_offset_list.push(offset);
357                }
358            }
359            if r_qid == POP_ORDER_REVIVE_QUEUE || batch_ack_msg.ack_offset_list.is_empty() {
360                return Ok(());
361            }
362            let ack_count = batch_ack_msg.ack_offset_list.len();
363            (
364                consumer_group,
365                topic,
366                qid,
367                r_qid,
368                start_offset,
369                akc_offset,
370                pop_time,
371                invisible_time,
372                ack_count,
373                Box::new(batch_ack_msg) as Box<dyn AckMessage + Send>,
374                broker_name.unwrap().clone(),
375            )
376        };
377
378        //this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
379        //this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup,topic,
380        // ackCount);
381        ack_msg.set_consumer_group(consume_group.clone());
382        ack_msg.set_topic(topic.clone());
383        ack_msg.set_queue_id(qid);
384        ack_msg.set_start_offset(start_offset);
385        ack_msg.set_ack_offset(ack_offset);
386        ack_msg.set_pop_time(pop_time);
387        ack_msg.set_broker_name(broker_name);
388        if self
389            .pop_message_processor
390            .pop_buffer_merge_service_mut()
391            .add_ack(r_qid, ack_msg.as_ref())
392        {
393            self.broker_runtime_inner
394                .pop_inflight_message_counter()
395                .decrement_in_flight_message_num(
396                    &topic,
397                    &consume_group,
398                    pop_time,
399                    qid,
400                    ack_count as i64,
401                );
402            return Ok(());
403        }
404        let mut inner = MessageExtBrokerInner::default();
405        inner.set_topic(self.revive_topic.clone());
406        inner.message_ext_inner.queue_id = qid;
407        if let Some(batch_ack) = ack_msg.as_any().downcast_ref::<BatchAckMsg>() {
408            inner.set_body(batch_ack.encode()?.into());
409            inner.set_tags(CheetahString::from_static_str(
410                PopAckConstants::BATCH_ACK_TAG,
411            ));
412            inner.put_property(
413                CheetahString::from_static_str(
414                    MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
415                ),
416                CheetahString::from(PopMessageProcessor::<MS>::gen_batch_ack_unique_id(
417                    batch_ack,
418                )),
419            );
420        } else if let Some(ack_msg) = ack_msg.as_any().downcast_ref::<AckMsg>() {
421            inner.set_body(ack_msg.encode()?.into());
422            inner.set_tags(CheetahString::from_static_str(PopAckConstants::ACK_TAG));
423            inner.put_property(
424                CheetahString::from_static_str(
425                    MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
426                ),
427                CheetahString::from(PopMessageProcessor::<MS>::gen_ack_unique_id(
428                    ack_msg as &dyn AckMessage,
429                )),
430            );
431        }
432        inner.message_ext_inner.born_timestamp = get_current_millis() as i64;
433        inner.message_ext_inner.store_host = self.broker_runtime_inner.store_host();
434        inner.message_ext_inner.born_host = self.broker_runtime_inner.store_host();
435        inner.set_delay_time_ms((pop_time + invisible_time) as u64);
436        inner.put_property(
437            CheetahString::from_static_str(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
438            CheetahString::from(PopMessageProcessor::<MS>::gen_ack_unique_id(
439                ack_msg.as_ref(),
440            )),
441        );
442        inner.properties_string =
443            message_decoder::message_properties_to_string(inner.get_properties());
444        let put_message_result = self
445            .broker_runtime_inner
446            .escape_bridge_mut()
447            .put_message_to_specific_queue(inner)
448            .await;
449        if !matches!(
450            put_message_result.put_message_status(),
451            PutMessageStatus::PutOk
452                | PutMessageStatus::FlushDiskTimeout
453                | PutMessageStatus::FlushSlaveTimeout
454                | PutMessageStatus::SlaveNotAvailable
455        ) {
456            error!(
457                "put ack msg error:{:?}",
458                put_message_result.put_message_status()
459            );
460        }
461        self.broker_runtime_inner
462            .pop_inflight_message_counter()
463            .decrement_in_flight_message_num(
464                &topic,
465                &consume_group,
466                pop_time,
467                qid,
468                ack_count as i64,
469            );
470        Ok(())
471    }
472
473    async fn ack_orderly(
474        &mut self,
475        topic: CheetahString,
476        consume_group: CheetahString,
477        q_id: i32,
478        ack_offset: i64,
479        pop_time: i64,
480        invisible_time: i64,
481        channel: &Channel,
482        response: &mut RemotingCommand,
483    ) {
484        let lock_key = CheetahString::from_string(format!(
485            "{}{}{}{}{}",
486            &topic,
487            PopAckConstants::SPLIT,
488            &consume_group,
489            PopAckConstants::SPLIT,
490            q_id
491        ));
492        let old_offset = self
493            .broker_runtime_inner
494            .consumer_offset_manager()
495            .query_offset(&consume_group, &topic, q_id);
496        if old_offset > ack_offset {
497            return;
498        }
499        while !self
500            .pop_message_processor
501            .queue_lock_manager()
502            .try_lock_with_key(lock_key.clone())
503            .await
504        {
505            //nothing to do
506        }
507        let old_offset = self
508            .broker_runtime_inner
509            .consumer_offset_manager()
510            .query_offset(&consume_group, &topic, q_id);
511        if old_offset > ack_offset {
512            return;
513        }
514        let next_offset = self
515            .broker_runtime_inner
516            .consumer_order_info_manager()
517            .commit_and_next(
518                &consume_group,
519                &topic,
520                q_id,
521                ack_offset as u64,
522                pop_time as u64,
523            );
524        match next_offset.cmp(&-1) {
525            Ordering::Less => {}
526            Ordering::Equal => {
527                let error_info = format!(
528                    "offset is illegal, key:{}, old:{}, commit:{}, next:{}, {}",
529                    lock_key,
530                    old_offset,
531                    ack_offset,
532                    next_offset,
533                    channel.remote_address()
534                );
535                response.set_code_ref(ResponseCode::MessageIllegal);
536                response.set_remark_mut(error_info);
537                self.pop_message_processor
538                    .queue_lock_manager()
539                    .unlock_with_key(lock_key)
540                    .await;
541                return;
542            }
543            Ordering::Greater => {
544                if !self
545                    .broker_runtime_inner
546                    .consumer_offset_manager()
547                    .has_offset_reset(consume_group.as_str(), topic.as_str(), q_id)
548                {
549                    self.broker_runtime_inner
550                        .consumer_offset_manager()
551                        .commit_offset(
552                            channel.remote_address().to_string().into(),
553                            &consume_group,
554                            &topic,
555                            q_id,
556                            next_offset,
557                        );
558                }
559
560                if !self
561                    .broker_runtime_inner
562                    .consumer_order_info_manager()
563                    .check_block(
564                        &CheetahString::empty(),
565                        &consume_group,
566                        &topic,
567                        q_id,
568                        invisible_time as u64,
569                    )
570                {
571                    self.pop_message_processor.notify_message_arriving(
572                        &topic,
573                        q_id,
574                        &consume_group,
575                    );
576                }
577            }
578        }
579        self.pop_message_processor
580            .queue_lock_manager()
581            .unlock_with_key(lock_key)
582            .await;
583        self.broker_runtime_inner
584            .pop_inflight_message_counter()
585            .decrement_in_flight_message_num(&topic, &consume_group, pop_time, q_id, 1);
586    }
587
588    pub fn shutdown(&mut self) {
589        for pop_revive_service in self.pop_revive_services.iter_mut() {
590            pop_revive_service.shutdown();
591        }
592    }
593}