iggy_cli/commands/binary_message/
poll_messages.rs1use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::Context;
21use async_trait::async_trait;
22use comfy_table::{Cell, CellAlignment, Row, Table};
23use iggy_binary_protocol::WireUserHeaders;
24use iggy_common::Client;
25use iggy_common::{
26 Consumer, HeaderKey, HeaderKind, Identifier, IggyByteSize, IggyDuration, IggyMessage,
27 IggyTimestamp, PollMessages, PollingStrategy, Sizeable,
28 wire_conversions::user_headers_from_wire,
29};
30use std::collections::HashSet;
31use tokio::io::AsyncWriteExt;
32use tracing::{Level, event};
33
34pub struct PollMessagesCmd {
35 poll_messages: PollMessages,
36 show_headers: bool,
37 output_file: Option<String>,
38}
39
40impl PollMessagesCmd {
41 #[allow(clippy::too_many_arguments)]
42 pub fn new(
43 stream_id: Identifier,
44 topic_id: Identifier,
45 partition_id: u32,
46 message_count: u32,
47 auto_commit: bool,
48 offset: Option<u64>,
49 first: bool,
50 last: bool,
51 next: bool,
52 consumer: Identifier,
53 show_headers: bool,
54 output_file: Option<String>,
55 ) -> Self {
56 let strategy = match (offset, first, last, next) {
57 (Some(offset), false, false, false) => PollingStrategy::offset(offset),
58 (None, true, false, false) => PollingStrategy::first(),
59 (None, false, true, false) => PollingStrategy::last(),
60 (None, false, false, true) => PollingStrategy::next(),
61 _ => unreachable!("Either offset or first, last or next must be specified"),
62 };
63 Self {
64 poll_messages: PollMessages {
65 consumer: Consumer::new(consumer),
66 stream_id,
67 topic_id,
68 partition_id: Some(partition_id),
69 strategy,
70 count: message_count,
71 auto_commit,
72 },
73 show_headers,
74 output_file,
75 }
76 }
77
78 fn create_message_header_keys(
79 &self,
80 polled_messages: &[IggyMessage],
81 ) -> HashSet<(HeaderKey, HeaderKind)> {
82 if !self.show_headers {
83 return HashSet::new();
84 }
85
86 polled_messages
87 .iter()
88 .flat_map(|m| {
89 if let Some(user_headers) = &m.user_headers {
90 match WireUserHeaders::from_bytes(user_headers.clone())
91 .map_err(|_| iggy_common::IggyError::InvalidHeaderKey)
92 .and_then(|w| user_headers_from_wire(&w))
93 {
94 Ok(headers) => headers
95 .iter()
96 .map(|(k, v)| (k.clone(), v.kind()))
97 .collect::<Vec<_>>(),
98 Err(e) => {
99 tracing::error!("Failed to parse user headers, error: {e}");
100 vec![]
101 }
102 }
103 } else {
104 vec![]
105 }
106 })
107 .collect::<HashSet<_>>()
108 }
109
110 fn create_table_header(header_key_set: &HashSet<(HeaderKey, HeaderKind)>) -> Row {
111 let mut table_header = vec![
112 Cell::new("Offset"),
113 Cell::new("Timestamp"),
114 Cell::new("ID"),
115 Cell::new("Length"),
116 Cell::new("Payload"),
117 ];
118 let message_headers = header_key_set
119 .iter()
120 .map(|(key, kind)| {
121 Cell::new(format!("Header: {}\n{}", key.to_string_value(), kind))
122 .set_alignment(CellAlignment::Center)
123 })
124 .collect::<Vec<_>>();
125 table_header.extend(message_headers);
126 Row::from(table_header)
127 }
128
129 fn create_table_content(
130 polled_messages: &[IggyMessage],
131 message_header_keys: &HashSet<(HeaderKey, HeaderKind)>,
132 ) -> Vec<Row> {
133 polled_messages
134 .iter()
135 .map(|message| {
136 let mut row = vec![
137 format!("{}", message.header.offset),
138 IggyTimestamp::from(message.header.timestamp)
139 .to_local_string("%Y-%m-%d %H:%M:%S%.6f"),
140 format!("{}", message.header.id),
141 format!("{}", message.payload.len()),
142 String::from_utf8_lossy(&message.payload).to_string(),
143 ];
144
145 let values = message_header_keys
146 .iter()
147 .map(|(key, kind)| {
148 message
149 .user_headers_map()
150 .expect("Failed to parse user headers")
151 .as_ref()
152 .map(|h| {
153 h.get(key)
154 .filter(|v| v.kind() == *kind)
155 .map(|v| v.to_string_value())
156 .unwrap_or_default()
157 })
158 .unwrap_or_default()
159 })
160 .collect::<Vec<_>>();
161 row.extend(values);
162 Row::from(row)
163 })
164 .collect::<_>()
165 }
166}
167
168#[async_trait]
169impl CliCommand for PollMessagesCmd {
170 fn explain(&self) -> String {
171 format!(
172 "poll messages from topic ID: {} and stream with ID: {}",
173 self.poll_messages.topic_id, self.poll_messages.stream_id
174 )
175 }
176
177 async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
178 let start = std::time::Instant::now();
179 let polled_messages = client
180 .poll_messages(
181 &self.poll_messages.stream_id,
182 &self.poll_messages.topic_id,
183 self.poll_messages.partition_id,
184 &self.poll_messages.consumer,
185 &self.poll_messages.strategy,
186 self.poll_messages.count,
187 self.poll_messages.auto_commit,
188 )
189 .await
190 .with_context(|| {
191 format!(
192 "Problem polling messages to topic with ID: {} and stream with ID: {}",
193 self.poll_messages.topic_id, self.poll_messages.stream_id
194 )
195 })?;
196 let elapsed = IggyDuration::new(start.elapsed());
197
198 event!(target: PRINT_TARGET, Level::INFO,
199 "Polled messages from topic with ID: {} and stream with ID: {} (from partition with ID: {})",
200 self.poll_messages.topic_id,
201 self.poll_messages.stream_id,
202 polled_messages.partition_id,
203 );
204
205 let polled_size = IggyByteSize::from(
206 polled_messages
207 .messages
208 .iter()
209 .map(|m| m.get_size_bytes().as_bytes_u64())
210 .sum::<u64>(),
211 );
212
213 let message_count_message = match polled_messages.messages.len() {
214 1 => "1 message".into(),
215 count => format!("{count} messages"),
216 };
217 event!(target: PRINT_TARGET, Level::INFO, "Polled {message_count_message} of total size {polled_size}, it took {}", elapsed.as_human_time_string());
218
219 if let Some(output_file) = &self.output_file {
220 event!(target: PRINT_TARGET, Level::INFO, "Storing messages to {output_file} binary file");
221
222 let mut saved_size = IggyByteSize::default();
223 let mut file = tokio::fs::OpenOptions::new()
224 .append(true)
225 .create(true)
226 .open(output_file)
227 .await
228 .with_context(|| format!("Problem opening file for writing: {output_file}"))?;
229
230 for message in polled_messages.messages.iter() {
231 let message = message.to_bytes();
232 saved_size += IggyByteSize::from(message.len() as u64);
233 file.write_all(&message)
234 .await
235 .with_context(|| format!("Problem writing message to file: {output_file}"))?;
236 }
237
238 let saved_size_str = saved_size.as_human_string();
239 event!(target: PRINT_TARGET, Level::INFO, "Stored {message_count_message} of total size {saved_size_str} to {output_file} binary file");
240 } else {
241 let message_header_keys = self.create_message_header_keys(&polled_messages.messages);
242
243 let mut table = Table::new();
244 let table_header = Self::create_table_header(&message_header_keys);
245 let table_content =
246 Self::create_table_content(&polled_messages.messages, &message_header_keys);
247 table.set_header(table_header);
248 table.add_rows(table_content);
249
250 event!(target: PRINT_TARGET, Level::INFO, "{table}");
251 }
252
253 Ok(())
254 }
255}