Skip to main content

iggy_cli/commands/binary_message/
flush_messages.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::{Context, Error};
21use async_trait::async_trait;
22use iggy_common::Client;
23use iggy_common::Identifier;
24use tracing::{Level, event};
25
26pub struct FlushMessagesCmd {
27    stream_id: Identifier,
28    topic_id: Identifier,
29    partition_id: u32,
30    fsync: bool,
31}
32
33impl FlushMessagesCmd {
34    pub fn new(
35        stream_id: Identifier,
36        topic_id: Identifier,
37        partition_id: u32,
38        fsync: bool,
39    ) -> Self {
40        Self {
41            stream_id,
42            topic_id,
43            partition_id,
44            fsync,
45        }
46    }
47}
48
49#[async_trait]
50impl CliCommand for FlushMessagesCmd {
51    fn explain(&self) -> String {
52        format!(
53            "flush messages from topic with ID: {} and stream with ID: {} (partition with ID: {}) {}",
54            self.topic_id,
55            self.stream_id,
56            self.partition_id,
57            if self.fsync {
58                "with fsync"
59            } else {
60                "without fsync"
61            },
62        )
63    }
64
65    async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), Error> {
66        client
67            .flush_unsaved_buffer(
68                &self.stream_id,
69                &self.topic_id,
70                self.partition_id,
71                self.fsync,
72            )
73            .await
74            .with_context(|| {
75                format!(
76                    "Problem flushing messages from topic with ID: {} and stream with ID: {} (partition with ID: {}) {}",
77                    self.topic_id, self.stream_id, self.partition_id, if self.fsync { "with fsync" } else { "without fsync" },
78                )
79            })?;
80
81        event!(target: PRINT_TARGET, Level::INFO,
82            "Flushed messages from topic with ID: {} and stream with ID: {} (partition with ID: {}) {}",
83            self.topic_id,
84            self.stream_id,
85            self.partition_id,
86            if self.fsync { "with fsync" } else { "without fsync" },
87        );
88
89        Ok(())
90    }
91}