Skip to main content

rocketmq_client_rust/consumer/consumer_impl/
process_queue.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::AtomicI64;
18use std::sync::atomic::AtomicU64;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21use std::sync::LazyLock;
22
23use cheetah_string::CheetahString;
24use rocketmq_common::common::message::message_ext::MessageExt;
25use rocketmq_common::common::message::MessageConst;
26use rocketmq_common::common::message::MessageTrait;
27use rocketmq_common::MessageAccessor::MessageAccessor;
28use rocketmq_common::TimeUtils::current_millis;
29use rocketmq_remoting::protocol::body::process_queue_info::ProcessQueueInfo;
30use rocketmq_rust::ArcMut;
31use tokio::sync::RwLock;
32use tracing::info;
33
34use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
35use crate::consumer::consumer_impl::PULL_MAX_IDLE_TIME;
36
37pub static REBALANCE_LOCK_MAX_LIVE_TIME: LazyLock<u64> = LazyLock::new(|| {
38    std::env::var("rocketmq.client.rebalance.lockMaxLiveTime")
39        .unwrap_or_else(|_| "30000".into())
40        .parse()
41        .unwrap_or(30000)
42});
43
44pub static REBALANCE_LOCK_INTERVAL: LazyLock<u64> = LazyLock::new(|| {
45    std::env::var("rocketmq.client.rebalance.lockInterval")
46        .unwrap_or_else(|_| "20000".into())
47        .parse()
48        .unwrap_or(20000)
49});
50
51struct ProcessQueueStore {
52    msg_tree_map: BTreeMap<i64, ArcMut<MessageExt>>,
53    consuming_msg_orderly_tree_map: BTreeMap<i64, ArcMut<MessageExt>>,
54    queue_offset_max: i64,
55}
56
57impl ProcessQueueStore {
58    fn new() -> Self {
59        ProcessQueueStore {
60            msg_tree_map: BTreeMap::new(),
61            consuming_msg_orderly_tree_map: BTreeMap::new(),
62            queue_offset_max: 0,
63        }
64    }
65}
66
67pub struct ProcessQueue {
68    store: RwLock<ProcessQueueStore>,
69    pub(crate) consume_lock: Arc<RwLock<()>>,
70
71    msg_count: AtomicI64,
72    msg_size: AtomicU64,
73    msg_acc_cnt: AtomicI64,
74    try_unlock_times: AtomicI64,
75
76    dropped: AtomicBool,
77    locked: AtomicBool,
78    consuming: AtomicBool,
79
80    last_pull_timestamp: AtomicU64,
81    last_consume_timestamp: AtomicU64,
82    last_lock_timestamp: AtomicU64,
83}
84
85impl Default for ProcessQueue {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl ProcessQueue {
92    pub fn new() -> Self {
93        let now = current_millis();
94        ProcessQueue {
95            store: RwLock::new(ProcessQueueStore::new()),
96            consume_lock: Arc::new(RwLock::new(())),
97
98            msg_count: AtomicI64::new(0),
99            msg_size: AtomicU64::new(0),
100            msg_acc_cnt: AtomicI64::new(0),
101            try_unlock_times: AtomicI64::new(0),
102
103            dropped: AtomicBool::new(false),
104            locked: AtomicBool::new(false),
105            consuming: AtomicBool::new(false),
106
107            last_pull_timestamp: AtomicU64::new(now),
108            last_consume_timestamp: AtomicU64::new(now),
109            last_lock_timestamp: AtomicU64::new(now),
110        }
111    }
112}
113
114impl ProcessQueue {
115    pub(crate) fn set_dropped(&self, dropped: bool) {
116        self.dropped.store(dropped, Ordering::Release);
117    }
118
119    pub(crate) fn is_dropped(&self) -> bool {
120        self.dropped.load(Ordering::Acquire)
121    }
122
123    pub(crate) fn get_last_lock_timestamp(&self) -> u64 {
124        self.last_lock_timestamp.load(Ordering::Acquire)
125    }
126
127    pub(crate) fn set_locked(&self, locked: bool) {
128        self.locked.store(locked, Ordering::Release);
129    }
130
131    pub(crate) fn is_pull_expired(&self) -> bool {
132        (current_millis() - self.last_pull_timestamp.load(Ordering::Acquire)) > *PULL_MAX_IDLE_TIME
133    }
134
135    pub(crate) fn is_lock_expired(&self) -> bool {
136        (current_millis() - self.last_lock_timestamp.load(Ordering::Acquire)) > *REBALANCE_LOCK_MAX_LIVE_TIME
137    }
138
139    pub(crate) fn inc_try_unlock_times(&self) {
140        self.try_unlock_times.fetch_add(1, Ordering::AcqRel);
141    }
142
143    pub(crate) async fn clean_expired_msg(&self, push_consumer: Option<ArcMut<DefaultMQPushConsumerImpl>>) {
144        let mut push_consumer = match push_consumer {
145            Some(pc) => pc,
146            None => return,
147        };
148
149        if push_consumer.is_consume_orderly() {
150            return;
151        }
152
153        let loop_ = 16.min(self.store.read().await.msg_tree_map.len());
154        for _ in 0..loop_ {
155            let msg = {
156                let store = self.store.read().await;
157                if let Some((_, value)) = store.msg_tree_map.first_key_value() {
158                    let consume_start_time_stamp = MessageAccessor::get_consume_start_time_stamp(value.as_ref());
159                    if let Some(ts_str) = consume_start_time_stamp {
160                        if let Ok(ts) = ts_str.parse::<u64>() {
161                            if current_millis() - ts > push_consumer.consumer_config.consume_timeout * 1000 * 60 {
162                                Some(value.clone())
163                            } else {
164                                None
165                            }
166                        } else {
167                            None
168                        }
169                    } else {
170                        None
171                    }
172                } else {
173                    None
174                }
175            };
176
177            let mut msg = match msg {
178                Some(m) => m,
179                None => break,
180            };
181
182            let msg_inner = msg.as_mut();
183            let topic = push_consumer.client_config.with_namespace(msg_inner.topic());
184            let queue_id = msg_inner.queue_id();
185            let msg_id = msg_inner.msg_id().to_string();
186            let store_host = format!("{:?}", msg_inner.store_host());
187            let queue_offset = msg_inner.queue_offset;
188            msg_inner.set_topic(topic);
189            let topic_name = msg_inner.topic().to_string();
190            let _ = push_consumer
191                .send_message_back_with_broker_name(msg_inner, 3, None, None)
192                .await;
193            info!(
194                "send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
195                topic_name, msg_id, store_host, queue_id, queue_offset
196            );
197
198            let should_remove = {
199                let store = self.store.read().await;
200                !store.msg_tree_map.is_empty() && queue_offset == *store.msg_tree_map.first_key_value().unwrap().0
201            };
202            if should_remove {
203                self.remove_message(&[msg]).await;
204            }
205        }
206    }
207
208    pub(crate) async fn put_message(&self, messages: Vec<ArcMut<MessageExt>>) -> bool {
209        let mut dispatch_to_consume = false;
210        let mut store = self.store.write().await;
211        let mut valid_msg_cnt = 0i64;
212
213        let acc_total = if let Some(last_msg) = messages.last() {
214            if let Some(property) =
215                last_msg.property(&CheetahString::from_static_str(MessageConst::PROPERTY_MAX_OFFSET))
216            {
217                property.parse::<i64>().unwrap_or(0) - last_msg.queue_offset
218            } else {
219                0
220            }
221        } else {
222            0
223        };
224
225        for message in messages {
226            if store
227                .msg_tree_map
228                .insert(message.queue_offset, message.clone())
229                .is_none()
230            {
231                valid_msg_cnt += 1;
232                if message.queue_offset > store.queue_offset_max {
233                    store.queue_offset_max = message.queue_offset;
234                }
235                let body_len = message.body().map_or(0, |b| b.len()) as u64;
236                self.msg_size.fetch_add(body_len, Ordering::AcqRel);
237            }
238        }
239        self.msg_count.fetch_add(valid_msg_cnt, Ordering::AcqRel);
240        if !store.msg_tree_map.is_empty() && !self.consuming.load(Ordering::Acquire) {
241            dispatch_to_consume = true;
242            self.consuming.store(true, Ordering::Release);
243        }
244        if acc_total > 0 {
245            self.msg_acc_cnt.store(acc_total, Ordering::Release);
246        }
247        dispatch_to_consume
248    }
249
250    pub(crate) async fn get_max_span(&self) -> u64 {
251        let store = self.store.read().await;
252        if store.msg_tree_map.is_empty() {
253            return 0;
254        }
255        let first = store.msg_tree_map.first_key_value().unwrap();
256        let last = store.msg_tree_map.last_key_value().unwrap();
257        (last.0 - first.0) as u64
258    }
259
260    pub(crate) async fn remove_message(&self, messages: &[ArcMut<MessageExt>]) -> i64 {
261        let now = current_millis();
262        let mut store = self.store.write().await;
263
264        self.last_consume_timestamp.store(now, Ordering::Release);
265
266        if store.msg_tree_map.is_empty() {
267            return -1;
268        }
269
270        let mut result = store.queue_offset_max + 1;
271        let mut removed_cnt = 0i64;
272
273        for message in messages {
274            if store.msg_tree_map.remove(&message.queue_offset).is_some() {
275                removed_cnt += 1;
276                let body_len = message.body().map_or(0, |b| b.len()) as u64;
277                if body_len > 0 {
278                    self.msg_size.fetch_sub(body_len, Ordering::AcqRel);
279                }
280            }
281        }
282
283        if removed_cnt > 0 {
284            let prev = self.msg_count.fetch_sub(removed_cnt, Ordering::AcqRel);
285            if prev == removed_cnt {
286                self.msg_size.store(0, Ordering::Release);
287            }
288        }
289
290        if !store.msg_tree_map.is_empty() {
291            result = *store.msg_tree_map.first_key_value().unwrap().0;
292        }
293
294        result
295    }
296
297    pub(crate) async fn rollback(&self) {
298        let mut store = self.store.write().await;
299        let mut consuming = std::mem::take(&mut store.consuming_msg_orderly_tree_map);
300        store.msg_tree_map.append(&mut consuming);
301    }
302
303    pub(crate) async fn commit(&self) -> i64 {
304        let mut store = self.store.write().await;
305        let offset = store
306            .consuming_msg_orderly_tree_map
307            .last_key_value()
308            .map_or(-1, |(k, _)| *k + 1);
309
310        let consumed_count = store.consuming_msg_orderly_tree_map.len() as i64;
311        let prev_count = self.msg_count.fetch_sub(consumed_count, Ordering::AcqRel);
312
313        if prev_count == consumed_count {
314            self.msg_size.store(0, Ordering::Release);
315        } else {
316            for message in store.consuming_msg_orderly_tree_map.values() {
317                let body_len = message.body().map_or(0, |b| b.len()) as u64;
318                if body_len > 0 {
319                    self.msg_size.fetch_sub(body_len, Ordering::AcqRel);
320                }
321            }
322        }
323        store.consuming_msg_orderly_tree_map.clear();
324        offset
325    }
326
327    pub(crate) async fn make_message_to_consume_again(&self, messages: &[ArcMut<MessageExt>]) {
328        let mut store = self.store.write().await;
329        for message in messages {
330            store.consuming_msg_orderly_tree_map.remove(&message.queue_offset);
331            store.msg_tree_map.insert(message.queue_offset, message.clone());
332        }
333    }
334
335    pub(crate) async fn take_messages(&self, batch_size: u32) -> Vec<ArcMut<MessageExt>> {
336        let mut messages = Vec::with_capacity(batch_size as usize);
337        let now = current_millis();
338        let mut store = self.store.write().await;
339
340        self.last_consume_timestamp.store(now, Ordering::Release);
341
342        for _ in 0..batch_size {
343            if let Some((offset, message)) = store.msg_tree_map.pop_first() {
344                store.consuming_msg_orderly_tree_map.insert(offset, message.clone());
345                messages.push(message);
346            } else {
347                break;
348            }
349        }
350
351        if messages.is_empty() {
352            self.consuming.store(false, Ordering::Release);
353        }
354
355        messages
356    }
357
358    pub(crate) async fn contains_message(&self, message_ext: &MessageExt) -> bool {
359        self.store
360            .read()
361            .await
362            .msg_tree_map
363            .contains_key(&message_ext.queue_offset)
364    }
365
366    pub(crate) async fn clear(&self) {
367        let mut store = self.store.write().await;
368        store.msg_tree_map.clear();
369        store.consuming_msg_orderly_tree_map.clear();
370        store.queue_offset_max = 0;
371        self.msg_count.store(0, Ordering::Release);
372        self.msg_size.store(0, Ordering::Release);
373    }
374
375    pub(crate) async fn fill_process_queue_info(&self, info: &mut ProcessQueueInfo) {
376        let store = self.store.read().await;
377
378        if !store.msg_tree_map.is_empty() {
379            if let Some((min_offset, _)) = store.msg_tree_map.first_key_value() {
380                info.cached_msg_min_offset = *min_offset as u64;
381            }
382            if let Some((max_offset, _)) = store.msg_tree_map.last_key_value() {
383                info.cached_msg_max_offset = *max_offset as u64;
384            }
385            info.cached_msg_count = store.msg_tree_map.len() as u32;
386        }
387
388        info.cached_msg_size_in_mib = (self.msg_size.load(Ordering::Acquire) / (1024 * 1024)) as u32;
389
390        if !store.consuming_msg_orderly_tree_map.is_empty() {
391            if let Some((min_offset, _)) = store.consuming_msg_orderly_tree_map.first_key_value() {
392                info.transaction_msg_min_offset = *min_offset as u64;
393            }
394            if let Some((max_offset, _)) = store.consuming_msg_orderly_tree_map.last_key_value() {
395                info.transaction_msg_max_offset = *max_offset as u64;
396            }
397            info.transaction_msg_count = store.consuming_msg_orderly_tree_map.len() as u32;
398        }
399
400        info.locked = self.locked.load(Ordering::Acquire);
401        info.try_unlock_times = self.try_unlock_times.load(Ordering::Acquire) as u64;
402        info.last_lock_timestamp = self.last_lock_timestamp.load(Ordering::Acquire);
403        info.droped = self.dropped.load(Ordering::Acquire);
404        info.last_pull_timestamp = self.last_pull_timestamp.load(Ordering::Acquire);
405        info.last_consume_timestamp = self.last_consume_timestamp.load(Ordering::Acquire);
406    }
407
408    /// Returns `Some((min_offset, max_offset))` of the pending message tree, or `None` if empty.
409    pub(crate) async fn get_offset_span(&self) -> Option<(i64, i64)> {
410        let store = self.store.read().await;
411        if store.msg_tree_map.is_empty() {
412            return None;
413        }
414        let min = *store.msg_tree_map.first_key_value().unwrap().0;
415        let max = *store.msg_tree_map.last_key_value().unwrap().0;
416        Some((min, max))
417    }
418
419    pub(crate) fn set_last_pull_timestamp(&self, last_pull_timestamp: u64) {
420        self.last_pull_timestamp.store(last_pull_timestamp, Ordering::Release);
421    }
422
423    pub(crate) fn set_last_lock_timestamp(&self, last_lock_timestamp: u64) {
424        self.last_lock_timestamp.store(last_lock_timestamp, Ordering::Release);
425    }
426
427    pub fn msg_count(&self) -> u64 {
428        self.msg_count.load(Ordering::Acquire).max(0) as u64
429    }
430
431    pub(crate) fn msg_size(&self) -> u64 {
432        self.msg_size.load(Ordering::Acquire)
433    }
434
435    pub(crate) fn is_locked(&self) -> bool {
436        self.locked.load(Ordering::Acquire)
437    }
438
439    pub(crate) async fn has_temp_message(&self) -> bool {
440        !self.store.read().await.msg_tree_map.is_empty()
441    }
442
443    pub(crate) fn get_last_pull_timestamp(&self) -> u64 {
444        self.last_pull_timestamp.load(Ordering::Acquire)
445    }
446
447    pub(crate) fn get_last_consume_timestamp(&self) -> u64 {
448        self.last_consume_timestamp.load(Ordering::Acquire)
449    }
450
451    pub(crate) fn get_msg_acc_cnt(&self) -> i64 {
452        self.msg_acc_cnt.load(Ordering::Acquire)
453    }
454
455    pub(crate) fn get_try_unlock_times(&self) -> i64 {
456        self.try_unlock_times.load(Ordering::Acquire)
457    }
458
459    pub(crate) fn is_consuming(&self) -> bool {
460        self.consuming.load(Ordering::Acquire)
461    }
462
463    pub(crate) fn set_consuming(&self, consuming: bool) {
464        self.consuming.store(consuming, Ordering::Release);
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use bytes::Bytes;
472    use cheetah_string::CheetahString;
473    use rocketmq_common::common::message::MessageTrait;
474
475    fn create_test_messages(count: usize) -> Vec<ArcMut<MessageExt>> {
476        let mut messages = Vec::with_capacity(count);
477        for i in 0..count {
478            let mut msg = MessageExt {
479                queue_offset: i as i64,
480                ..Default::default()
481            };
482            msg.set_body(Bytes::from(vec![0u8; 100]));
483            msg.set_topic(CheetahString::from_static_str("test_topic"));
484            messages.push(ArcMut::new(msg));
485        }
486        messages
487    }
488
489    #[tokio::test]
490    async fn test_remove_message_count_correct() {
491        let pq = ProcessQueue::new();
492
493        let msgs = create_test_messages(10);
494        pq.put_message(msgs.clone()).await;
495
496        assert_eq!(pq.msg_count(), 10);
497
498        pq.remove_message(&msgs[0..5]).await;
499
500        assert_eq!(pq.msg_count(), 5);
501        assert_eq!(pq.msg_size(), 500);
502    }
503
504    #[tokio::test]
505    async fn test_remove_message_all() {
506        let pq = ProcessQueue::new();
507
508        let msgs = create_test_messages(5);
509        pq.put_message(msgs.clone()).await;
510
511        assert_eq!(pq.msg_count(), 5);
512
513        pq.remove_message(&msgs).await;
514
515        assert_eq!(pq.msg_count(), 0);
516        assert_eq!(pq.msg_size(), 0);
517    }
518
519    #[tokio::test]
520    async fn test_take_messages_updates_consuming_map() {
521        let pq = ProcessQueue::new();
522
523        let msgs = create_test_messages(10);
524        pq.put_message(msgs).await;
525
526        let taken = pq.take_messages(5).await;
527
528        assert_eq!(taken.len(), 5);
529
530        let store = pq.store.read().await;
531        assert_eq!(store.consuming_msg_orderly_tree_map.len(), 5);
532
533        for msg in &taken {
534            assert!(store.consuming_msg_orderly_tree_map.contains_key(&msg.queue_offset));
535        }
536    }
537
538    #[tokio::test]
539    async fn test_take_messages_updates_timestamp() {
540        let pq = ProcessQueue::new();
541
542        let msgs = create_test_messages(5);
543        pq.put_message(msgs).await;
544
545        let timestamp_before = pq.last_consume_timestamp.load(Ordering::Acquire);
546
547        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
548
549        pq.take_messages(3).await;
550
551        let timestamp_after = pq.last_consume_timestamp.load(Ordering::Acquire);
552
553        assert!(timestamp_after > timestamp_before);
554    }
555
556    #[tokio::test]
557    async fn test_take_messages_empty_sets_consuming_false() {
558        let pq = ProcessQueue::new();
559
560        pq.consuming.store(true, Ordering::Release);
561
562        let taken = pq.take_messages(5).await;
563
564        assert_eq!(taken.len(), 0);
565        assert!(!pq.consuming.load(Ordering::Acquire));
566    }
567
568    #[tokio::test]
569    async fn test_fill_process_queue_info() {
570        let pq = ProcessQueue::new();
571
572        let msgs = create_test_messages(10);
573        pq.put_message(msgs).await;
574
575        pq.take_messages(3).await;
576
577        let mut info = ProcessQueueInfo {
578            commit_offset: 0,
579            cached_msg_min_offset: 0,
580            cached_msg_max_offset: 0,
581            cached_msg_count: 0,
582            cached_msg_size_in_mib: 0,
583            transaction_msg_min_offset: 0,
584            transaction_msg_max_offset: 0,
585            transaction_msg_count: 0,
586            locked: false,
587            try_unlock_times: 0,
588            last_lock_timestamp: 0,
589            droped: false,
590            last_pull_timestamp: 0,
591            last_consume_timestamp: 0,
592        };
593
594        pq.fill_process_queue_info(&mut info).await;
595
596        assert_eq!(info.cached_msg_count, 7);
597        assert_eq!(info.cached_msg_min_offset, 3);
598        assert_eq!(info.cached_msg_max_offset, 9);
599
600        assert_eq!(info.transaction_msg_count, 3);
601        assert_eq!(info.transaction_msg_min_offset, 0);
602        assert_eq!(info.transaction_msg_max_offset, 2);
603    }
604
605    #[tokio::test]
606    async fn test_has_temp_message() {
607        let pq = ProcessQueue::new();
608
609        assert!(!pq.has_temp_message().await);
610
611        let msgs = create_test_messages(5);
612        pq.put_message(msgs.clone()).await;
613
614        assert!(pq.has_temp_message().await);
615
616        pq.remove_message(&msgs).await;
617
618        assert!(!pq.has_temp_message().await);
619    }
620
621    #[tokio::test]
622    async fn test_rollback_after_take_messages() {
623        let pq = ProcessQueue::new();
624
625        let msgs = create_test_messages(10);
626        pq.put_message(msgs).await;
627
628        assert_eq!(pq.msg_count(), 10);
629
630        let taken = pq.take_messages(5).await;
631        assert_eq!(taken.len(), 5);
632
633        let msg_tree_map_len = pq.store.read().await.msg_tree_map.len();
634        assert_eq!(msg_tree_map_len, 5);
635
636        pq.rollback().await;
637
638        let msg_tree_map_len_after = pq.store.read().await.msg_tree_map.len();
639        assert_eq!(msg_tree_map_len_after, 10);
640
641        let consuming_map_len = pq.store.read().await.consuming_msg_orderly_tree_map.len();
642        assert_eq!(consuming_map_len, 0);
643    }
644
645    #[tokio::test]
646    async fn test_commit_after_take_messages() {
647        let pq = ProcessQueue::new();
648
649        let msgs = create_test_messages(10);
650        pq.put_message(msgs).await;
651
652        pq.take_messages(5).await;
653
654        assert_eq!(pq.msg_count(), 10);
655
656        let offset = pq.commit().await;
657
658        assert_eq!(offset, 5);
659        assert_eq!(pq.msg_count(), 5);
660
661        let consuming_map_len = pq.store.read().await.consuming_msg_orderly_tree_map.len();
662        assert_eq!(consuming_map_len, 0);
663    }
664
665    #[tokio::test]
666    async fn test_getter_methods() {
667        let pq = ProcessQueue::new();
668
669        assert_eq!(pq.get_try_unlock_times(), 0);
670        assert!(!pq.is_consuming());
671        assert!(!pq.is_dropped());
672        assert!(!pq.is_locked());
673
674        pq.inc_try_unlock_times();
675        assert_eq!(pq.get_try_unlock_times(), 1);
676
677        pq.set_consuming(true);
678        assert!(pq.is_consuming());
679
680        pq.set_dropped(true);
681        assert!(pq.is_dropped());
682
683        pq.set_locked(true);
684        assert!(pq.is_locked());
685    }
686
687    #[tokio::test]
688    async fn test_timestamp_getters() {
689        let pq = ProcessQueue::new();
690
691        let pull_ts = pq.get_last_pull_timestamp();
692        let consume_ts = pq.get_last_consume_timestamp();
693        let lock_ts = pq.get_last_lock_timestamp();
694
695        assert!(pull_ts > 0);
696        assert!(consume_ts > 0);
697        assert!(lock_ts > 0);
698
699        let new_ts = current_millis() + 1000;
700        pq.set_last_pull_timestamp(new_ts);
701        assert_eq!(pq.get_last_pull_timestamp(), new_ts);
702
703        pq.set_last_lock_timestamp(new_ts);
704        assert_eq!(pq.get_last_lock_timestamp(), new_ts);
705    }
706
707    #[tokio::test]
708    async fn test_msg_acc_cnt() {
709        let pq = ProcessQueue::new();
710
711        assert_eq!(pq.get_msg_acc_cnt(), 0);
712
713        let mut msgs = Vec::new();
714        for i in 0..5 {
715            let mut msg = MessageExt {
716                queue_offset: i as i64,
717                ..Default::default()
718            };
719            msg.set_body(Bytes::from(vec![0u8; 100]));
720            msg.set_topic(CheetahString::from_static_str("test_topic"));
721            msg.put_property(
722                CheetahString::from_static_str(MessageConst::PROPERTY_MAX_OFFSET),
723                CheetahString::from(format!("{}", i + 100)),
724            );
725            msgs.push(ArcMut::new(msg));
726        }
727
728        pq.put_message(msgs).await;
729
730        assert!(pq.get_msg_acc_cnt() > 0);
731    }
732
733    #[tokio::test]
734    async fn test_clear_resets_all_state() {
735        let pq = ProcessQueue::new();
736
737        let msgs = create_test_messages(10);
738        pq.put_message(msgs).await;
739        pq.take_messages(5).await;
740
741        assert_eq!(pq.msg_count(), 10);
742        assert!(pq.msg_size() > 0);
743
744        pq.clear().await;
745
746        assert_eq!(pq.msg_count(), 0);
747        assert_eq!(pq.msg_size(), 0);
748
749        let msg_tree_map_len = pq.store.read().await.msg_tree_map.len();
750        assert_eq!(msg_tree_map_len, 0);
751
752        let consuming_map_len = pq.store.read().await.consuming_msg_orderly_tree_map.len();
753        assert_eq!(consuming_map_len, 0);
754    }
755
756    #[tokio::test]
757    async fn test_get_max_span() {
758        let pq = ProcessQueue::new();
759
760        assert_eq!(pq.get_max_span().await, 0);
761
762        let msgs = create_test_messages(10);
763        pq.put_message(msgs).await;
764
765        let span = pq.get_max_span().await;
766        assert_eq!(span, 9);
767    }
768
769    #[tokio::test]
770    async fn test_consuming_flag_behavior() {
771        let pq = ProcessQueue::new();
772
773        assert!(!pq.is_consuming());
774
775        let msgs = create_test_messages(5);
776        let should_dispatch = pq.put_message(msgs).await;
777
778        assert!(should_dispatch);
779        assert!(pq.is_consuming());
780
781        pq.set_consuming(false);
782        assert!(!pq.is_consuming());
783    }
784
785    #[tokio::test]
786    async fn test_queue_offset_max_with_unordered_messages() {
787        let pq = ProcessQueue::new();
788
789        let mut msgs = Vec::new();
790        for offset in [5, 2, 8, 1, 9, 3].iter() {
791            let mut msg = MessageExt {
792                queue_offset: *offset,
793                ..Default::default()
794            };
795            msg.set_body(Bytes::from(vec![0u8; 100]));
796            msg.set_topic(CheetahString::from_static_str("test_topic"));
797            msgs.push(ArcMut::new(msg));
798        }
799
800        pq.put_message(msgs).await;
801
802        let max_offset = pq.store.read().await.queue_offset_max;
803        assert_eq!(max_offset, 9);
804    }
805
806    #[tokio::test]
807    async fn test_commit_preserves_msg_size_correctly() {
808        let pq = ProcessQueue::new();
809
810        let msgs = create_test_messages(10);
811        pq.put_message(msgs).await;
812
813        assert_eq!(pq.msg_size(), 1000);
814
815        pq.take_messages(3).await;
816
817        assert_eq!(pq.msg_count(), 10);
818        assert_eq!(pq.msg_size(), 1000);
819
820        pq.commit().await;
821
822        assert_eq!(pq.msg_count(), 7);
823        assert_eq!(pq.msg_size(), 700);
824    }
825
826    #[tokio::test]
827    async fn test_commit_when_all_messages_consumed() {
828        let pq = ProcessQueue::new();
829
830        let msgs = create_test_messages(5);
831        pq.put_message(msgs).await;
832
833        assert_eq!(pq.msg_size(), 500);
834
835        pq.take_messages(5).await;
836
837        pq.commit().await;
838
839        assert_eq!(pq.msg_count(), 0);
840        assert_eq!(pq.msg_size(), 0);
841    }
842}