rocketmq_cli/
content_show.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::fs;
19use std::path::PathBuf;
20
21use bytes::Buf;
22use cheetah_string::CheetahString;
23use rocketmq_common::common::message::message_decoder;
24use rocketmq_common::common::message::MessageConst;
25use rocketmq_store::log_file::mapped_file::default_mapped_file_impl::DefaultMappedFile;
26use rocketmq_store::log_file::mapped_file::MappedFile;
27use tabled::Table;
28use tabled::Tabled;
29
30pub fn print_content(from: Option<u32>, to: Option<u32>, path: Option<PathBuf>) {
31    if path.is_none() {
32        eprintln!("File path is none");
33        return;
34    }
35    let path_buf = path.unwrap().into_os_string();
36    let file_metadata = fs::metadata(path_buf.clone()).unwrap();
37    println!("file size: {}B", file_metadata.len());
38    let mapped_file = DefaultMappedFile::new(
39        CheetahString::from(path_buf.to_string_lossy().to_string()),
40        file_metadata.len(),
41    );
42    // read message number
43    let mut counter = 0;
44    let form = from.unwrap_or_default();
45    let to = to.unwrap_or(u32::MAX);
46    let mut current_pos = 0usize;
47    let mut table = vec![];
48    loop {
49        if counter >= to {
50            break;
51        }
52        let bytes = mapped_file.get_bytes(current_pos, 4);
53        if bytes.is_none() {
54            break;
55        }
56        let mut size_bytes = bytes.unwrap();
57        let size = size_bytes.get_i32();
58        if size <= 0 {
59            break;
60        }
61        counter += 1;
62        if counter < form {
63            current_pos += size as usize;
64            continue;
65        }
66        let mut msg_bytes = mapped_file.get_bytes(current_pos, size as usize);
67        current_pos += size as usize;
68        if msg_bytes.is_none() {
69            break;
70        }
71        let message =
72            message_decoder::decode(msg_bytes.as_mut().unwrap(), true, false, false, false, true);
73        //parse message bytes and print it
74        match message {
75            None => {}
76            Some(value) => {
77                table.push(MessagePrint {
78                    message_id: value.msg_id,
79                    client_message_id: value
80                        .message
81                        .get_property(&CheetahString::from_static_str(
82                            MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
83                        ))
84                        .unwrap_or(CheetahString::empty()),
85                });
86            }
87        }
88    }
89    println!("{}", Table::new(table));
90}
91
92#[derive(Tabled)]
93struct MessagePrint {
94    message_id: CheetahString,
95    client_message_id: CheetahString,
96}