iggy_cli/commands/binary_message/
send_messages.rs1use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::Context;
21use async_trait::async_trait;
22use bytes::Bytes;
23use iggy_common::Client;
24use iggy_common::{HeaderKey, HeaderValue, Identifier, IggyMessage, Partitioning, Sizeable};
25use std::collections::BTreeMap;
26use std::io::{self, Read};
27use tokio::io::AsyncReadExt;
28use tracing::{Level, event};
29
30pub struct SendMessagesCmd {
31 stream_id: Identifier,
32 topic_id: Identifier,
33 partitioning: Partitioning,
34 messages: Option<Vec<String>>,
35 headers: Vec<(HeaderKey, HeaderValue)>,
36 input_file: Option<String>,
37}
38
39impl SendMessagesCmd {
40 pub fn new(
41 stream_id: Identifier,
42 topic_id: Identifier,
43 partition_id: Option<u32>,
44 message_key: Option<String>,
45 messages: Option<Vec<String>>,
46 headers: Vec<(HeaderKey, HeaderValue)>,
47 input_file: Option<String>,
48 ) -> Self {
49 let partitioning = match (partition_id, message_key) {
50 (Some(_), Some(_)) => unreachable!(),
51 (Some(partition_id), None) => Partitioning::partition_id(partition_id),
52 (None, Some(message_key)) => Partitioning::messages_key_str(message_key.as_str())
53 .unwrap_or_else(|_| {
54 panic!("Failed to create Partitioning with {message_key} string message key")
55 }),
56 (None, None) => Partitioning::default(),
57 };
58 Self {
59 stream_id,
60 topic_id,
61 partitioning,
62 messages,
63 headers,
64 input_file,
65 }
66 }
67
68 fn read_message_from_stdin(&self) -> Result<String, io::Error> {
69 let mut buffer = String::new();
70
71 io::stdin().read_to_string(&mut buffer)?;
72
73 Ok(buffer)
74 }
75
76 fn get_headers(&self) -> Option<BTreeMap<HeaderKey, HeaderValue>> {
77 match self.headers.len() {
78 0 => None,
79 _ => Some(self.headers.iter().cloned().collect()),
80 }
81 }
82}
83
84#[async_trait]
85impl CliCommand for SendMessagesCmd {
86 fn explain(&self) -> String {
87 format!(
88 "send messages to topic with ID: {} and stream with ID: {}",
89 self.topic_id, self.stream_id
90 )
91 }
92
93 async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
94 let mut messages = if let Some(input_file) = &self.input_file {
95 let mut file = tokio::fs::OpenOptions::new()
96 .read(true)
97 .open(input_file)
98 .await
99 .with_context(|| format!("Problem opening file for reading: {input_file}"))?;
100 let mut buffer = Vec::new();
101 file.read_to_end(&mut buffer)
102 .await
103 .with_context(|| format!("Problem reading file: {input_file}"))?;
104
105 event!(target: PRINT_TARGET, Level::INFO,
106 "Read {} bytes from {} file", buffer.len(), input_file,
107 );
108
109 let mut messages: Vec<IggyMessage> = Vec::new();
110 let mut bytes_read = 0usize;
111 let all_messages_bytes: Bytes = buffer.into();
112
113 while bytes_read < all_messages_bytes.len() {
114 let message_bytes = all_messages_bytes.slice(bytes_read..);
115 let message = IggyMessage::from_bytes(message_bytes);
116 match message {
117 Ok(message) => {
118 let message_size = message.get_size_bytes().as_bytes_usize();
119 messages.push(message);
120 bytes_read += message_size;
121 }
122 Err(e) => {
123 event!(target: PRINT_TARGET, Level::ERROR,
124 "Failed to parse message from bytes: {e} at offset {bytes_read}",
125 );
126 break;
127 }
128 }
129 }
130 event!(target: PRINT_TARGET, Level::INFO,
131 "Created {} messages using {bytes_read} bytes", messages.len(),
132 );
133
134 messages
135 } else {
136 let headers = self.get_headers();
137 match &self.messages {
138 Some(messages) => messages
139 .iter()
140 .map(|s| {
141 IggyMessage::builder()
142 .payload(Bytes::from(s.clone()))
143 .user_headers(headers.clone().unwrap_or_default())
144 .build()
145 })
146 .collect::<Result<Vec<_>, _>>()
147 .with_context(|| "Failed to create messages from provided strings")?,
148 None => {
149 let input = self.read_message_from_stdin()?;
150
151 input
152 .lines()
153 .map(|m| {
154 IggyMessage::builder()
155 .payload(m.to_owned().into())
156 .user_headers(headers.clone().unwrap_or_default())
157 .build()
158 })
159 .collect::<Result<Vec<_>, _>>()
160 .with_context(|| "Failed to create messages from stdin")?
161 }
162 }
163 };
164
165 client
166 .send_messages(
167 &self.stream_id,
168 &self.topic_id,
169 &self.partitioning,
170 &mut messages,
171 )
172 .await
173 .with_context(|| {
174 format!(
175 "Problem sending messages to topic with ID: {} and stream with ID: {}",
176 self.topic_id, self.stream_id
177 )
178 })?;
179
180 event!(target: PRINT_TARGET, Level::INFO,
181 "Sent messages to topic with ID: {} and stream with ID: {}",
182 self.topic_id,
183 self.stream_id,
184 );
185
186 Ok(())
187 }
188}