Skip to main content

iggy_cli/commands/binary_message/
send_messages.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::Context;
21use async_trait::async_trait;
22use bytes::Bytes;
23use iggy_common::Client;
24use iggy_common::{HeaderKey, HeaderValue, Identifier, IggyMessage, Partitioning, Sizeable};
25use std::collections::BTreeMap;
26use std::io::{self, Read};
27use tokio::io::AsyncReadExt;
28use tracing::{Level, event};
29
30pub struct SendMessagesCmd {
31    stream_id: Identifier,
32    topic_id: Identifier,
33    partitioning: Partitioning,
34    messages: Option<Vec<String>>,
35    headers: Vec<(HeaderKey, HeaderValue)>,
36    input_file: Option<String>,
37}
38
39impl SendMessagesCmd {
40    pub fn new(
41        stream_id: Identifier,
42        topic_id: Identifier,
43        partition_id: Option<u32>,
44        message_key: Option<String>,
45        messages: Option<Vec<String>>,
46        headers: Vec<(HeaderKey, HeaderValue)>,
47        input_file: Option<String>,
48    ) -> Self {
49        let partitioning = match (partition_id, message_key) {
50            (Some(_), Some(_)) => unreachable!(),
51            (Some(partition_id), None) => Partitioning::partition_id(partition_id),
52            (None, Some(message_key)) => Partitioning::messages_key_str(message_key.as_str())
53                .unwrap_or_else(|_| {
54                    panic!("Failed to create Partitioning with {message_key} string message key")
55                }),
56            (None, None) => Partitioning::default(),
57        };
58        Self {
59            stream_id,
60            topic_id,
61            partitioning,
62            messages,
63            headers,
64            input_file,
65        }
66    }
67
68    fn read_message_from_stdin(&self) -> Result<String, io::Error> {
69        let mut buffer = String::new();
70
71        io::stdin().read_to_string(&mut buffer)?;
72
73        Ok(buffer)
74    }
75
76    fn get_headers(&self) -> Option<BTreeMap<HeaderKey, HeaderValue>> {
77        match self.headers.len() {
78            0 => None,
79            _ => Some(self.headers.iter().cloned().collect()),
80        }
81    }
82}
83
84#[async_trait]
85impl CliCommand for SendMessagesCmd {
86    fn explain(&self) -> String {
87        format!(
88            "send messages to topic with ID: {} and stream with ID: {}",
89            self.topic_id, self.stream_id
90        )
91    }
92
93    async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), anyhow::Error> {
94        let mut messages = if let Some(input_file) = &self.input_file {
95            let mut file = tokio::fs::OpenOptions::new()
96                .read(true)
97                .open(input_file)
98                .await
99                .with_context(|| format!("Problem opening file for reading: {input_file}"))?;
100            let mut buffer = Vec::new();
101            file.read_to_end(&mut buffer)
102                .await
103                .with_context(|| format!("Problem reading file: {input_file}"))?;
104
105            event!(target: PRINT_TARGET, Level::INFO,
106                "Read {} bytes from {} file", buffer.len(), input_file,
107            );
108
109            let mut messages: Vec<IggyMessage> = Vec::new();
110            let mut bytes_read = 0usize;
111            let all_messages_bytes: Bytes = buffer.into();
112
113            while bytes_read < all_messages_bytes.len() {
114                let message_bytes = all_messages_bytes.slice(bytes_read..);
115                let message = IggyMessage::from_bytes(message_bytes);
116                match message {
117                    Ok(message) => {
118                        let message_size = message.get_size_bytes().as_bytes_usize();
119                        messages.push(message);
120                        bytes_read += message_size;
121                    }
122                    Err(e) => {
123                        event!(target: PRINT_TARGET, Level::ERROR,
124                            "Failed to parse message from bytes: {e} at offset {bytes_read}",
125                        );
126                        break;
127                    }
128                }
129            }
130            event!(target: PRINT_TARGET, Level::INFO,
131                "Created {} messages using {bytes_read} bytes", messages.len(),
132            );
133
134            messages
135        } else {
136            let headers = self.get_headers();
137            match &self.messages {
138                Some(messages) => messages
139                    .iter()
140                    .map(|s| {
141                        IggyMessage::builder()
142                            .payload(Bytes::from(s.clone()))
143                            .user_headers(headers.clone().unwrap_or_default())
144                            .build()
145                    })
146                    .collect::<Result<Vec<_>, _>>()
147                    .with_context(|| "Failed to create messages from provided strings")?,
148                None => {
149                    let input = self.read_message_from_stdin()?;
150
151                    input
152                        .lines()
153                        .map(|m| {
154                            IggyMessage::builder()
155                                .payload(m.to_owned().into())
156                                .user_headers(headers.clone().unwrap_or_default())
157                                .build()
158                        })
159                        .collect::<Result<Vec<_>, _>>()
160                        .with_context(|| "Failed to create messages from stdin")?
161                }
162            }
163        };
164
165        client
166            .send_messages(
167                &self.stream_id,
168                &self.topic_id,
169                &self.partitioning,
170                &mut messages,
171            )
172            .await
173            .with_context(|| {
174                format!(
175                    "Problem sending messages to topic with ID: {} and stream with ID: {}",
176                    self.topic_id, self.stream_id
177                )
178            })?;
179
180        event!(target: PRINT_TARGET, Level::INFO,
181            "Sent messages to topic with ID: {} and stream with ID: {}",
182            self.topic_id,
183            self.stream_id,
184        );
185
186        Ok(())
187    }
188}