1use std::collections::BTreeMap;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::AtomicI64;
18use std::sync::atomic::AtomicU64;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21use std::sync::LazyLock;
22
23use cheetah_string::CheetahString;
24use rocketmq_common::common::message::message_ext::MessageExt;
25use rocketmq_common::common::message::MessageConst;
26use rocketmq_common::common::message::MessageTrait;
27use rocketmq_common::MessageAccessor::MessageAccessor;
28use rocketmq_common::TimeUtils::current_millis;
29use rocketmq_remoting::protocol::body::process_queue_info::ProcessQueueInfo;
30use rocketmq_rust::ArcMut;
31use tokio::sync::RwLock;
32use tracing::info;
33
34use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
35use crate::consumer::consumer_impl::PULL_MAX_IDLE_TIME;
36
37pub static REBALANCE_LOCK_MAX_LIVE_TIME: LazyLock<u64> = LazyLock::new(|| {
38 std::env::var("rocketmq.client.rebalance.lockMaxLiveTime")
39 .unwrap_or_else(|_| "30000".into())
40 .parse()
41 .unwrap_or(30000)
42});
43
44pub static REBALANCE_LOCK_INTERVAL: LazyLock<u64> = LazyLock::new(|| {
45 std::env::var("rocketmq.client.rebalance.lockInterval")
46 .unwrap_or_else(|_| "20000".into())
47 .parse()
48 .unwrap_or(20000)
49});
50
51struct ProcessQueueStore {
52 msg_tree_map: BTreeMap<i64, ArcMut<MessageExt>>,
53 consuming_msg_orderly_tree_map: BTreeMap<i64, ArcMut<MessageExt>>,
54 queue_offset_max: i64,
55}
56
57impl ProcessQueueStore {
58 fn new() -> Self {
59 ProcessQueueStore {
60 msg_tree_map: BTreeMap::new(),
61 consuming_msg_orderly_tree_map: BTreeMap::new(),
62 queue_offset_max: 0,
63 }
64 }
65}
66
67pub struct ProcessQueue {
68 store: RwLock<ProcessQueueStore>,
69 pub(crate) consume_lock: Arc<RwLock<()>>,
70
71 msg_count: AtomicI64,
72 msg_size: AtomicU64,
73 msg_acc_cnt: AtomicI64,
74 try_unlock_times: AtomicI64,
75
76 dropped: AtomicBool,
77 locked: AtomicBool,
78 consuming: AtomicBool,
79
80 last_pull_timestamp: AtomicU64,
81 last_consume_timestamp: AtomicU64,
82 last_lock_timestamp: AtomicU64,
83}
84
85impl Default for ProcessQueue {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl ProcessQueue {
92 pub fn new() -> Self {
93 let now = current_millis();
94 ProcessQueue {
95 store: RwLock::new(ProcessQueueStore::new()),
96 consume_lock: Arc::new(RwLock::new(())),
97
98 msg_count: AtomicI64::new(0),
99 msg_size: AtomicU64::new(0),
100 msg_acc_cnt: AtomicI64::new(0),
101 try_unlock_times: AtomicI64::new(0),
102
103 dropped: AtomicBool::new(false),
104 locked: AtomicBool::new(false),
105 consuming: AtomicBool::new(false),
106
107 last_pull_timestamp: AtomicU64::new(now),
108 last_consume_timestamp: AtomicU64::new(now),
109 last_lock_timestamp: AtomicU64::new(now),
110 }
111 }
112}
113
114impl ProcessQueue {
115 pub(crate) fn set_dropped(&self, dropped: bool) {
116 self.dropped.store(dropped, Ordering::Release);
117 }
118
119 pub(crate) fn is_dropped(&self) -> bool {
120 self.dropped.load(Ordering::Acquire)
121 }
122
123 pub(crate) fn get_last_lock_timestamp(&self) -> u64 {
124 self.last_lock_timestamp.load(Ordering::Acquire)
125 }
126
127 pub(crate) fn set_locked(&self, locked: bool) {
128 self.locked.store(locked, Ordering::Release);
129 }
130
131 pub(crate) fn is_pull_expired(&self) -> bool {
132 (current_millis() - self.last_pull_timestamp.load(Ordering::Acquire)) > *PULL_MAX_IDLE_TIME
133 }
134
135 pub(crate) fn is_lock_expired(&self) -> bool {
136 (current_millis() - self.last_lock_timestamp.load(Ordering::Acquire)) > *REBALANCE_LOCK_MAX_LIVE_TIME
137 }
138
139 pub(crate) fn inc_try_unlock_times(&self) {
140 self.try_unlock_times.fetch_add(1, Ordering::AcqRel);
141 }
142
143 pub(crate) async fn clean_expired_msg(&self, push_consumer: Option<ArcMut<DefaultMQPushConsumerImpl>>) {
144 let mut push_consumer = match push_consumer {
145 Some(pc) => pc,
146 None => return,
147 };
148
149 if push_consumer.is_consume_orderly() {
150 return;
151 }
152
153 let loop_ = 16.min(self.store.read().await.msg_tree_map.len());
154 for _ in 0..loop_ {
155 let msg = {
156 let store = self.store.read().await;
157 if let Some((_, value)) = store.msg_tree_map.first_key_value() {
158 let consume_start_time_stamp = MessageAccessor::get_consume_start_time_stamp(value.as_ref());
159 if let Some(ts_str) = consume_start_time_stamp {
160 if let Ok(ts) = ts_str.parse::<u64>() {
161 if current_millis() - ts > push_consumer.consumer_config.consume_timeout * 1000 * 60 {
162 Some(value.clone())
163 } else {
164 None
165 }
166 } else {
167 None
168 }
169 } else {
170 None
171 }
172 } else {
173 None
174 }
175 };
176
177 let mut msg = match msg {
178 Some(m) => m,
179 None => break,
180 };
181
182 let msg_inner = msg.as_mut();
183 let topic = push_consumer.client_config.with_namespace(msg_inner.topic());
184 let queue_id = msg_inner.queue_id();
185 let msg_id = msg_inner.msg_id().to_string();
186 let store_host = format!("{:?}", msg_inner.store_host());
187 let queue_offset = msg_inner.queue_offset;
188 msg_inner.set_topic(topic);
189 let topic_name = msg_inner.topic().to_string();
190 let _ = push_consumer
191 .send_message_back_with_broker_name(msg_inner, 3, None, None)
192 .await;
193 info!(
194 "send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
195 topic_name, msg_id, store_host, queue_id, queue_offset
196 );
197
198 let should_remove = {
199 let store = self.store.read().await;
200 !store.msg_tree_map.is_empty() && queue_offset == *store.msg_tree_map.first_key_value().unwrap().0
201 };
202 if should_remove {
203 self.remove_message(&[msg]).await;
204 }
205 }
206 }
207
208 pub(crate) async fn put_message(&self, messages: Vec<ArcMut<MessageExt>>) -> bool {
209 let mut dispatch_to_consume = false;
210 let mut store = self.store.write().await;
211 let mut valid_msg_cnt = 0i64;
212
213 let acc_total = if let Some(last_msg) = messages.last() {
214 if let Some(property) =
215 last_msg.property(&CheetahString::from_static_str(MessageConst::PROPERTY_MAX_OFFSET))
216 {
217 property.parse::<i64>().unwrap_or(0) - last_msg.queue_offset
218 } else {
219 0
220 }
221 } else {
222 0
223 };
224
225 for message in messages {
226 if store
227 .msg_tree_map
228 .insert(message.queue_offset, message.clone())
229 .is_none()
230 {
231 valid_msg_cnt += 1;
232 if message.queue_offset > store.queue_offset_max {
233 store.queue_offset_max = message.queue_offset;
234 }
235 let body_len = message.body().map_or(0, |b| b.len()) as u64;
236 self.msg_size.fetch_add(body_len, Ordering::AcqRel);
237 }
238 }
239 self.msg_count.fetch_add(valid_msg_cnt, Ordering::AcqRel);
240 if !store.msg_tree_map.is_empty() && !self.consuming.load(Ordering::Acquire) {
241 dispatch_to_consume = true;
242 self.consuming.store(true, Ordering::Release);
243 }
244 if acc_total > 0 {
245 self.msg_acc_cnt.store(acc_total, Ordering::Release);
246 }
247 dispatch_to_consume
248 }
249
250 pub(crate) async fn get_max_span(&self) -> u64 {
251 let store = self.store.read().await;
252 if store.msg_tree_map.is_empty() {
253 return 0;
254 }
255 let first = store.msg_tree_map.first_key_value().unwrap();
256 let last = store.msg_tree_map.last_key_value().unwrap();
257 (last.0 - first.0) as u64
258 }
259
260 pub(crate) async fn remove_message(&self, messages: &[ArcMut<MessageExt>]) -> i64 {
261 let now = current_millis();
262 let mut store = self.store.write().await;
263
264 self.last_consume_timestamp.store(now, Ordering::Release);
265
266 if store.msg_tree_map.is_empty() {
267 return -1;
268 }
269
270 let mut result = store.queue_offset_max + 1;
271 let mut removed_cnt = 0i64;
272
273 for message in messages {
274 if store.msg_tree_map.remove(&message.queue_offset).is_some() {
275 removed_cnt += 1;
276 let body_len = message.body().map_or(0, |b| b.len()) as u64;
277 if body_len > 0 {
278 self.msg_size.fetch_sub(body_len, Ordering::AcqRel);
279 }
280 }
281 }
282
283 if removed_cnt > 0 {
284 let prev = self.msg_count.fetch_sub(removed_cnt, Ordering::AcqRel);
285 if prev == removed_cnt {
286 self.msg_size.store(0, Ordering::Release);
287 }
288 }
289
290 if !store.msg_tree_map.is_empty() {
291 result = *store.msg_tree_map.first_key_value().unwrap().0;
292 }
293
294 result
295 }
296
297 pub(crate) async fn rollback(&self) {
298 let mut store = self.store.write().await;
299 let mut consuming = std::mem::take(&mut store.consuming_msg_orderly_tree_map);
300 store.msg_tree_map.append(&mut consuming);
301 }
302
303 pub(crate) async fn commit(&self) -> i64 {
304 let mut store = self.store.write().await;
305 let offset = store
306 .consuming_msg_orderly_tree_map
307 .last_key_value()
308 .map_or(-1, |(k, _)| *k + 1);
309
310 let consumed_count = store.consuming_msg_orderly_tree_map.len() as i64;
311 let prev_count = self.msg_count.fetch_sub(consumed_count, Ordering::AcqRel);
312
313 if prev_count == consumed_count {
314 self.msg_size.store(0, Ordering::Release);
315 } else {
316 for message in store.consuming_msg_orderly_tree_map.values() {
317 let body_len = message.body().map_or(0, |b| b.len()) as u64;
318 if body_len > 0 {
319 self.msg_size.fetch_sub(body_len, Ordering::AcqRel);
320 }
321 }
322 }
323 store.consuming_msg_orderly_tree_map.clear();
324 offset
325 }
326
327 pub(crate) async fn make_message_to_consume_again(&self, messages: &[ArcMut<MessageExt>]) {
328 let mut store = self.store.write().await;
329 for message in messages {
330 store.consuming_msg_orderly_tree_map.remove(&message.queue_offset);
331 store.msg_tree_map.insert(message.queue_offset, message.clone());
332 }
333 }
334
335 pub(crate) async fn take_messages(&self, batch_size: u32) -> Vec<ArcMut<MessageExt>> {
336 let mut messages = Vec::with_capacity(batch_size as usize);
337 let now = current_millis();
338 let mut store = self.store.write().await;
339
340 self.last_consume_timestamp.store(now, Ordering::Release);
341
342 for _ in 0..batch_size {
343 if let Some((offset, message)) = store.msg_tree_map.pop_first() {
344 store.consuming_msg_orderly_tree_map.insert(offset, message.clone());
345 messages.push(message);
346 } else {
347 break;
348 }
349 }
350
351 if messages.is_empty() {
352 self.consuming.store(false, Ordering::Release);
353 }
354
355 messages
356 }
357
358 pub(crate) async fn contains_message(&self, message_ext: &MessageExt) -> bool {
359 self.store
360 .read()
361 .await
362 .msg_tree_map
363 .contains_key(&message_ext.queue_offset)
364 }
365
366 pub(crate) async fn clear(&self) {
367 let mut store = self.store.write().await;
368 store.msg_tree_map.clear();
369 store.consuming_msg_orderly_tree_map.clear();
370 store.queue_offset_max = 0;
371 self.msg_count.store(0, Ordering::Release);
372 self.msg_size.store(0, Ordering::Release);
373 }
374
375 pub(crate) async fn fill_process_queue_info(&self, info: &mut ProcessQueueInfo) {
376 let store = self.store.read().await;
377
378 if !store.msg_tree_map.is_empty() {
379 if let Some((min_offset, _)) = store.msg_tree_map.first_key_value() {
380 info.cached_msg_min_offset = *min_offset as u64;
381 }
382 if let Some((max_offset, _)) = store.msg_tree_map.last_key_value() {
383 info.cached_msg_max_offset = *max_offset as u64;
384 }
385 info.cached_msg_count = store.msg_tree_map.len() as u32;
386 }
387
388 info.cached_msg_size_in_mib = (self.msg_size.load(Ordering::Acquire) / (1024 * 1024)) as u32;
389
390 if !store.consuming_msg_orderly_tree_map.is_empty() {
391 if let Some((min_offset, _)) = store.consuming_msg_orderly_tree_map.first_key_value() {
392 info.transaction_msg_min_offset = *min_offset as u64;
393 }
394 if let Some((max_offset, _)) = store.consuming_msg_orderly_tree_map.last_key_value() {
395 info.transaction_msg_max_offset = *max_offset as u64;
396 }
397 info.transaction_msg_count = store.consuming_msg_orderly_tree_map.len() as u32;
398 }
399
400 info.locked = self.locked.load(Ordering::Acquire);
401 info.try_unlock_times = self.try_unlock_times.load(Ordering::Acquire) as u64;
402 info.last_lock_timestamp = self.last_lock_timestamp.load(Ordering::Acquire);
403 info.droped = self.dropped.load(Ordering::Acquire);
404 info.last_pull_timestamp = self.last_pull_timestamp.load(Ordering::Acquire);
405 info.last_consume_timestamp = self.last_consume_timestamp.load(Ordering::Acquire);
406 }
407
408 pub(crate) async fn get_offset_span(&self) -> Option<(i64, i64)> {
410 let store = self.store.read().await;
411 if store.msg_tree_map.is_empty() {
412 return None;
413 }
414 let min = *store.msg_tree_map.first_key_value().unwrap().0;
415 let max = *store.msg_tree_map.last_key_value().unwrap().0;
416 Some((min, max))
417 }
418
419 pub(crate) fn set_last_pull_timestamp(&self, last_pull_timestamp: u64) {
420 self.last_pull_timestamp.store(last_pull_timestamp, Ordering::Release);
421 }
422
423 pub(crate) fn set_last_lock_timestamp(&self, last_lock_timestamp: u64) {
424 self.last_lock_timestamp.store(last_lock_timestamp, Ordering::Release);
425 }
426
427 pub fn msg_count(&self) -> u64 {
428 self.msg_count.load(Ordering::Acquire).max(0) as u64
429 }
430
431 pub(crate) fn msg_size(&self) -> u64 {
432 self.msg_size.load(Ordering::Acquire)
433 }
434
435 pub(crate) fn is_locked(&self) -> bool {
436 self.locked.load(Ordering::Acquire)
437 }
438
439 pub(crate) async fn has_temp_message(&self) -> bool {
440 !self.store.read().await.msg_tree_map.is_empty()
441 }
442
443 pub(crate) fn get_last_pull_timestamp(&self) -> u64 {
444 self.last_pull_timestamp.load(Ordering::Acquire)
445 }
446
447 pub(crate) fn get_last_consume_timestamp(&self) -> u64 {
448 self.last_consume_timestamp.load(Ordering::Acquire)
449 }
450
451 pub(crate) fn get_msg_acc_cnt(&self) -> i64 {
452 self.msg_acc_cnt.load(Ordering::Acquire)
453 }
454
455 pub(crate) fn get_try_unlock_times(&self) -> i64 {
456 self.try_unlock_times.load(Ordering::Acquire)
457 }
458
459 pub(crate) fn is_consuming(&self) -> bool {
460 self.consuming.load(Ordering::Acquire)
461 }
462
463 pub(crate) fn set_consuming(&self, consuming: bool) {
464 self.consuming.store(consuming, Ordering::Release);
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use bytes::Bytes;
472 use cheetah_string::CheetahString;
473 use rocketmq_common::common::message::MessageTrait;
474
475 fn create_test_messages(count: usize) -> Vec<ArcMut<MessageExt>> {
476 let mut messages = Vec::with_capacity(count);
477 for i in 0..count {
478 let mut msg = MessageExt {
479 queue_offset: i as i64,
480 ..Default::default()
481 };
482 msg.set_body(Bytes::from(vec![0u8; 100]));
483 msg.set_topic(CheetahString::from_static_str("test_topic"));
484 messages.push(ArcMut::new(msg));
485 }
486 messages
487 }
488
489 #[tokio::test]
490 async fn test_remove_message_count_correct() {
491 let pq = ProcessQueue::new();
492
493 let msgs = create_test_messages(10);
494 pq.put_message(msgs.clone()).await;
495
496 assert_eq!(pq.msg_count(), 10);
497
498 pq.remove_message(&msgs[0..5]).await;
499
500 assert_eq!(pq.msg_count(), 5);
501 assert_eq!(pq.msg_size(), 500);
502 }
503
504 #[tokio::test]
505 async fn test_remove_message_all() {
506 let pq = ProcessQueue::new();
507
508 let msgs = create_test_messages(5);
509 pq.put_message(msgs.clone()).await;
510
511 assert_eq!(pq.msg_count(), 5);
512
513 pq.remove_message(&msgs).await;
514
515 assert_eq!(pq.msg_count(), 0);
516 assert_eq!(pq.msg_size(), 0);
517 }
518
519 #[tokio::test]
520 async fn test_take_messages_updates_consuming_map() {
521 let pq = ProcessQueue::new();
522
523 let msgs = create_test_messages(10);
524 pq.put_message(msgs).await;
525
526 let taken = pq.take_messages(5).await;
527
528 assert_eq!(taken.len(), 5);
529
530 let store = pq.store.read().await;
531 assert_eq!(store.consuming_msg_orderly_tree_map.len(), 5);
532
533 for msg in &taken {
534 assert!(store.consuming_msg_orderly_tree_map.contains_key(&msg.queue_offset));
535 }
536 }
537
538 #[tokio::test]
539 async fn test_take_messages_updates_timestamp() {
540 let pq = ProcessQueue::new();
541
542 let msgs = create_test_messages(5);
543 pq.put_message(msgs).await;
544
545 let timestamp_before = pq.last_consume_timestamp.load(Ordering::Acquire);
546
547 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
548
549 pq.take_messages(3).await;
550
551 let timestamp_after = pq.last_consume_timestamp.load(Ordering::Acquire);
552
553 assert!(timestamp_after > timestamp_before);
554 }
555
556 #[tokio::test]
557 async fn test_take_messages_empty_sets_consuming_false() {
558 let pq = ProcessQueue::new();
559
560 pq.consuming.store(true, Ordering::Release);
561
562 let taken = pq.take_messages(5).await;
563
564 assert_eq!(taken.len(), 0);
565 assert!(!pq.consuming.load(Ordering::Acquire));
566 }
567
568 #[tokio::test]
569 async fn test_fill_process_queue_info() {
570 let pq = ProcessQueue::new();
571
572 let msgs = create_test_messages(10);
573 pq.put_message(msgs).await;
574
575 pq.take_messages(3).await;
576
577 let mut info = ProcessQueueInfo {
578 commit_offset: 0,
579 cached_msg_min_offset: 0,
580 cached_msg_max_offset: 0,
581 cached_msg_count: 0,
582 cached_msg_size_in_mib: 0,
583 transaction_msg_min_offset: 0,
584 transaction_msg_max_offset: 0,
585 transaction_msg_count: 0,
586 locked: false,
587 try_unlock_times: 0,
588 last_lock_timestamp: 0,
589 droped: false,
590 last_pull_timestamp: 0,
591 last_consume_timestamp: 0,
592 };
593
594 pq.fill_process_queue_info(&mut info).await;
595
596 assert_eq!(info.cached_msg_count, 7);
597 assert_eq!(info.cached_msg_min_offset, 3);
598 assert_eq!(info.cached_msg_max_offset, 9);
599
600 assert_eq!(info.transaction_msg_count, 3);
601 assert_eq!(info.transaction_msg_min_offset, 0);
602 assert_eq!(info.transaction_msg_max_offset, 2);
603 }
604
605 #[tokio::test]
606 async fn test_has_temp_message() {
607 let pq = ProcessQueue::new();
608
609 assert!(!pq.has_temp_message().await);
610
611 let msgs = create_test_messages(5);
612 pq.put_message(msgs.clone()).await;
613
614 assert!(pq.has_temp_message().await);
615
616 pq.remove_message(&msgs).await;
617
618 assert!(!pq.has_temp_message().await);
619 }
620
621 #[tokio::test]
622 async fn test_rollback_after_take_messages() {
623 let pq = ProcessQueue::new();
624
625 let msgs = create_test_messages(10);
626 pq.put_message(msgs).await;
627
628 assert_eq!(pq.msg_count(), 10);
629
630 let taken = pq.take_messages(5).await;
631 assert_eq!(taken.len(), 5);
632
633 let msg_tree_map_len = pq.store.read().await.msg_tree_map.len();
634 assert_eq!(msg_tree_map_len, 5);
635
636 pq.rollback().await;
637
638 let msg_tree_map_len_after = pq.store.read().await.msg_tree_map.len();
639 assert_eq!(msg_tree_map_len_after, 10);
640
641 let consuming_map_len = pq.store.read().await.consuming_msg_orderly_tree_map.len();
642 assert_eq!(consuming_map_len, 0);
643 }
644
645 #[tokio::test]
646 async fn test_commit_after_take_messages() {
647 let pq = ProcessQueue::new();
648
649 let msgs = create_test_messages(10);
650 pq.put_message(msgs).await;
651
652 pq.take_messages(5).await;
653
654 assert_eq!(pq.msg_count(), 10);
655
656 let offset = pq.commit().await;
657
658 assert_eq!(offset, 5);
659 assert_eq!(pq.msg_count(), 5);
660
661 let consuming_map_len = pq.store.read().await.consuming_msg_orderly_tree_map.len();
662 assert_eq!(consuming_map_len, 0);
663 }
664
665 #[tokio::test]
666 async fn test_getter_methods() {
667 let pq = ProcessQueue::new();
668
669 assert_eq!(pq.get_try_unlock_times(), 0);
670 assert!(!pq.is_consuming());
671 assert!(!pq.is_dropped());
672 assert!(!pq.is_locked());
673
674 pq.inc_try_unlock_times();
675 assert_eq!(pq.get_try_unlock_times(), 1);
676
677 pq.set_consuming(true);
678 assert!(pq.is_consuming());
679
680 pq.set_dropped(true);
681 assert!(pq.is_dropped());
682
683 pq.set_locked(true);
684 assert!(pq.is_locked());
685 }
686
687 #[tokio::test]
688 async fn test_timestamp_getters() {
689 let pq = ProcessQueue::new();
690
691 let pull_ts = pq.get_last_pull_timestamp();
692 let consume_ts = pq.get_last_consume_timestamp();
693 let lock_ts = pq.get_last_lock_timestamp();
694
695 assert!(pull_ts > 0);
696 assert!(consume_ts > 0);
697 assert!(lock_ts > 0);
698
699 let new_ts = current_millis() + 1000;
700 pq.set_last_pull_timestamp(new_ts);
701 assert_eq!(pq.get_last_pull_timestamp(), new_ts);
702
703 pq.set_last_lock_timestamp(new_ts);
704 assert_eq!(pq.get_last_lock_timestamp(), new_ts);
705 }
706
707 #[tokio::test]
708 async fn test_msg_acc_cnt() {
709 let pq = ProcessQueue::new();
710
711 assert_eq!(pq.get_msg_acc_cnt(), 0);
712
713 let mut msgs = Vec::new();
714 for i in 0..5 {
715 let mut msg = MessageExt {
716 queue_offset: i as i64,
717 ..Default::default()
718 };
719 msg.set_body(Bytes::from(vec![0u8; 100]));
720 msg.set_topic(CheetahString::from_static_str("test_topic"));
721 msg.put_property(
722 CheetahString::from_static_str(MessageConst::PROPERTY_MAX_OFFSET),
723 CheetahString::from(format!("{}", i + 100)),
724 );
725 msgs.push(ArcMut::new(msg));
726 }
727
728 pq.put_message(msgs).await;
729
730 assert!(pq.get_msg_acc_cnt() > 0);
731 }
732
733 #[tokio::test]
734 async fn test_clear_resets_all_state() {
735 let pq = ProcessQueue::new();
736
737 let msgs = create_test_messages(10);
738 pq.put_message(msgs).await;
739 pq.take_messages(5).await;
740
741 assert_eq!(pq.msg_count(), 10);
742 assert!(pq.msg_size() > 0);
743
744 pq.clear().await;
745
746 assert_eq!(pq.msg_count(), 0);
747 assert_eq!(pq.msg_size(), 0);
748
749 let msg_tree_map_len = pq.store.read().await.msg_tree_map.len();
750 assert_eq!(msg_tree_map_len, 0);
751
752 let consuming_map_len = pq.store.read().await.consuming_msg_orderly_tree_map.len();
753 assert_eq!(consuming_map_len, 0);
754 }
755
756 #[tokio::test]
757 async fn test_get_max_span() {
758 let pq = ProcessQueue::new();
759
760 assert_eq!(pq.get_max_span().await, 0);
761
762 let msgs = create_test_messages(10);
763 pq.put_message(msgs).await;
764
765 let span = pq.get_max_span().await;
766 assert_eq!(span, 9);
767 }
768
769 #[tokio::test]
770 async fn test_consuming_flag_behavior() {
771 let pq = ProcessQueue::new();
772
773 assert!(!pq.is_consuming());
774
775 let msgs = create_test_messages(5);
776 let should_dispatch = pq.put_message(msgs).await;
777
778 assert!(should_dispatch);
779 assert!(pq.is_consuming());
780
781 pq.set_consuming(false);
782 assert!(!pq.is_consuming());
783 }
784
785 #[tokio::test]
786 async fn test_queue_offset_max_with_unordered_messages() {
787 let pq = ProcessQueue::new();
788
789 let mut msgs = Vec::new();
790 for offset in [5, 2, 8, 1, 9, 3].iter() {
791 let mut msg = MessageExt {
792 queue_offset: *offset,
793 ..Default::default()
794 };
795 msg.set_body(Bytes::from(vec![0u8; 100]));
796 msg.set_topic(CheetahString::from_static_str("test_topic"));
797 msgs.push(ArcMut::new(msg));
798 }
799
800 pq.put_message(msgs).await;
801
802 let max_offset = pq.store.read().await.queue_offset_max;
803 assert_eq!(max_offset, 9);
804 }
805
806 #[tokio::test]
807 async fn test_commit_preserves_msg_size_correctly() {
808 let pq = ProcessQueue::new();
809
810 let msgs = create_test_messages(10);
811 pq.put_message(msgs).await;
812
813 assert_eq!(pq.msg_size(), 1000);
814
815 pq.take_messages(3).await;
816
817 assert_eq!(pq.msg_count(), 10);
818 assert_eq!(pq.msg_size(), 1000);
819
820 pq.commit().await;
821
822 assert_eq!(pq.msg_count(), 7);
823 assert_eq!(pq.msg_size(), 700);
824 }
825
826 #[tokio::test]
827 async fn test_commit_when_all_messages_consumed() {
828 let pq = ProcessQueue::new();
829
830 let msgs = create_test_messages(5);
831 pq.put_message(msgs).await;
832
833 assert_eq!(pq.msg_size(), 500);
834
835 pq.take_messages(5).await;
836
837 pq.commit().await;
838
839 assert_eq!(pq.msg_count(), 0);
840 assert_eq!(pq.msg_size(), 0);
841 }
842}