Skip to main content

iggy_cli/commands/binary_message/
poll_messages.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::Context;
21use async_trait::async_trait;
22use comfy_table::{Cell, CellAlignment, Row, Table};
23use iggy_binary_protocol::WireUserHeaders;
24use iggy_common::Client;
25use iggy_common::{
26    Consumer, HeaderKey, HeaderKind, Identifier, IggyByteSize, IggyDuration, IggyMessage,
27    IggyTimestamp, PollMessages, PollingStrategy, Sizeable,
28    wire_conversions::user_headers_from_wire,
29};
30use std::collections::HashSet;
31use tokio::io::AsyncWriteExt;
32use tracing::{Level, event};
33
34pub struct PollMessagesCmd {
35    poll_messages: PollMessages,
36    show_headers: bool,
37    output_file: Option<String>,
38}
39
40impl PollMessagesCmd {
41    #[allow(clippy::too_many_arguments)]
42    pub fn new(
43        stream_id: Identifier,
44        topic_id: Identifier,
45        partition_id: u32,
46        message_count: u32,
47        auto_commit: bool,
48        offset: Option<u64>,
49        first: bool,
50        last: bool,
51        next: bool,
52        consumer: Identifier,
53        show_headers: bool,
54        output_file: Option<String>,
55    ) -> Self {
56        let strategy = match (offset, first, last, next) {
57            (Some(offset), false, false, false) => PollingStrategy::offset(offset),
58            (None, true, false, false) => PollingStrategy::first(),
59            (None, false, true, false) => PollingStrategy::last(),
60            (None, false, false, true) => PollingStrategy::next(),
61            _ => unreachable!("Either offset or first, last or next must be specified"),
62        };
63        Self {
64            poll_messages: PollMessages {
65                consumer: Consumer::new(consumer),
66                stream_id,
67                topic_id,
68                partition_id: Some(partition_id),
69                strategy,
70                count: message_count,
71                auto_commit,
72            },
73            show_headers,
74            output_file,
75        }
76    }
77
78    fn create_message_header_keys(
79        &self,
80        polled_messages: &[IggyMessage],
81    ) -> HashSet<(HeaderKey, HeaderKind)> {
82        if !self.show_headers {
83            return HashSet::new();
84        }
85
86        polled_messages
87            .iter()
88            .flat_map(|m| {
89                if let Some(user_headers) = &m.user_headers {
90                    match WireUserHeaders::from_bytes(user_headers.clone())
91                        .map_err(|_| iggy_common::IggyError::InvalidHeaderKey)
92                        .and_then(|w| user_headers_from_wire(&w))
93                    {
94                        Ok(headers) => headers
95                            .iter()
96                            .map(|(k, v)| (k.clone(), v.kind()))
97                            .collect::<Vec<_>>(),
98                        Err(e) => {
99                            tracing::error!("Failed to parse user headers, error: {e}");
100                            vec![]
101                        }
102                    }
103                } else {
104                    vec![]
105                }
106            })
107            .collect::<HashSet<_>>()
108    }
109
110    fn create_table_header(header_key_set: &HashSet<(HeaderKey, HeaderKind)>) -> Row {
111        let mut table_header = vec![
112            Cell::new("Offset"),
113            Cell::new("Timestamp"),
114            Cell::new("ID"),
115            Cell::new("Length"),
116            Cell::new("Payload"),
117        ];
118        let message_headers = header_key_set
119            .iter()
120            .map(|(key, kind)| {
121                Cell::new(format!("Header: {}\n{}", key.to_string_value(), kind))
122                    .set_alignment(CellAlignment::Center)
123            })
124            .collect::<Vec<_>>();
125        table_header.extend(message_headers);
126        Row::from(table_header)
127    }
128
129    fn create_table_content(
130        polled_messages: &[IggyMessage],
131        message_header_keys: &HashSet<(HeaderKey, HeaderKind)>,
132    ) -> Vec<Row> {
133        polled_messages
134            .iter()
135            .map(|message| {
136                let mut row = vec![
137                    format!("{}", message.header.offset),
138                    IggyTimestamp::from(message.header.timestamp)
139                        .to_local_string("%Y-%m-%d %H:%M:%S%.6f"),
140                    format!("{}", message.header.id),
141                    format!("{}", message.payload.len()),
142                    String::from_utf8_lossy(&message.payload).to_string(),
143                ];
144
145                let values = message_header_keys
146                    .iter()
147                    .map(|(key, kind)| {
148                        message
149                            .user_headers_map()
150                            .expect("Failed to parse user headers")
151                            .as_ref()
152                            .map(|h| {
153                                h.get(key)
154                                    .filter(|v| v.kind() == *kind)
155                                    .map(|v| v.to_string_value())
156                                    .unwrap_or_default()
157                            })
158                            .unwrap_or_default()
159                    })
160                    .collect::<Vec<_>>();
161                row.extend(values);
162                Row::from(row)
163            })
164            .collect::<_>()
165    }
166}
167
168#[async_trait]
169impl CliCommand for PollMessagesCmd {
170    fn explain(&self) -> String {
171        format!(
172            "poll messages from topic ID: {} and stream with ID: {}",
173            self.poll_messages.topic_id, self.poll_messages.stream_id
174        )
175    }
176
177    async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
178        let start = std::time::Instant::now();
179        let polled_messages = client
180            .poll_messages(
181                &self.poll_messages.stream_id,
182                &self.poll_messages.topic_id,
183                self.poll_messages.partition_id,
184                &self.poll_messages.consumer,
185                &self.poll_messages.strategy,
186                self.poll_messages.count,
187                self.poll_messages.auto_commit,
188            )
189            .await
190            .with_context(|| {
191                format!(
192                    "Problem polling messages to topic with ID: {} and stream with ID: {}",
193                    self.poll_messages.topic_id, self.poll_messages.stream_id
194                )
195            })?;
196        let elapsed = IggyDuration::new(start.elapsed());
197
198        event!(target: PRINT_TARGET, Level::INFO,
199            "Polled messages from topic with ID: {} and stream with ID: {} (from partition with ID: {})",
200            self.poll_messages.topic_id,
201            self.poll_messages.stream_id,
202            polled_messages.partition_id,
203        );
204
205        let polled_size = IggyByteSize::from(
206            polled_messages
207                .messages
208                .iter()
209                .map(|m| m.get_size_bytes().as_bytes_u64())
210                .sum::<u64>(),
211        );
212
213        let message_count_message = match polled_messages.messages.len() {
214            1 => "1 message".into(),
215            count => format!("{count} messages"),
216        };
217        event!(target: PRINT_TARGET, Level::INFO, "Polled {message_count_message} of total size {polled_size}, it took {}", elapsed.as_human_time_string());
218
219        if let Some(output_file) = &self.output_file {
220            event!(target: PRINT_TARGET, Level::INFO, "Storing messages to {output_file} binary file");
221
222            let mut saved_size = IggyByteSize::default();
223            let mut file = tokio::fs::OpenOptions::new()
224                .append(true)
225                .create(true)
226                .open(output_file)
227                .await
228                .with_context(|| format!("Problem opening file for writing: {output_file}"))?;
229
230            for message in polled_messages.messages.iter() {
231                let message = message.to_bytes();
232                saved_size += IggyByteSize::from(message.len() as u64);
233                file.write_all(&message)
234                    .await
235                    .with_context(|| format!("Problem writing message to file: {output_file}"))?;
236            }
237
238            let saved_size_str = saved_size.as_human_string();
239            event!(target: PRINT_TARGET, Level::INFO, "Stored {message_count_message} of total size {saved_size_str} to {output_file} binary file");
240        } else {
241            let message_header_keys = self.create_message_header_keys(&polled_messages.messages);
242
243            let mut table = Table::new();
244            let table_header = Self::create_table_header(&message_header_keys);
245            let table_content =
246                Self::create_table_content(&polled_messages.messages, &message_header_keys);
247            table.set_header(table_header);
248            table.add_rows(table_content);
249
250            event!(target: PRINT_TARGET, Level::INFO, "{table}");
251        }
252
253        Ok(())
254    }
255}