iggy_cli/commands/binary_message/
flush_messages.rs1use crate::commands::cli_command::{CliCommand, PRINT_TARGET};
20use anyhow::{Context, Error};
21use async_trait::async_trait;
22use iggy_common::Client;
23use iggy_common::Identifier;
24use tracing::{Level, event};
25
26pub struct FlushMessagesCmd {
27 stream_id: Identifier,
28 topic_id: Identifier,
29 partition_id: u32,
30 fsync: bool,
31}
32
33impl FlushMessagesCmd {
34 pub fn new(
35 stream_id: Identifier,
36 topic_id: Identifier,
37 partition_id: u32,
38 fsync: bool,
39 ) -> Self {
40 Self {
41 stream_id,
42 topic_id,
43 partition_id,
44 fsync,
45 }
46 }
47}
48
49#[async_trait]
50impl CliCommand for FlushMessagesCmd {
51 fn explain(&self) -> String {
52 format!(
53 "flush messages from topic with ID: {} and stream with ID: {} (partition with ID: {}) {}",
54 self.topic_id,
55 self.stream_id,
56 self.partition_id,
57 if self.fsync {
58 "with fsync"
59 } else {
60 "without fsync"
61 },
62 )
63 }
64
65 async fn execute_cmd(&mut self, client: &dyn Client) -> anyhow::Result<(), Error> {
66 client
67 .flush_unsaved_buffer(
68 &self.stream_id,
69 &self.topic_id,
70 self.partition_id,
71 self.fsync,
72 )
73 .await
74 .with_context(|| {
75 format!(
76 "Problem flushing messages from topic with ID: {} and stream with ID: {} (partition with ID: {}) {}",
77 self.topic_id, self.stream_id, self.partition_id, if self.fsync { "with fsync" } else { "without fsync" },
78 )
79 })?;
80
81 event!(target: PRINT_TARGET, Level::INFO,
82 "Flushed messages from topic with ID: {} and stream with ID: {} (partition with ID: {}) {}",
83 self.topic_id,
84 self.stream_id,
85 self.partition_id,
86 if self.fsync { "with fsync" } else { "without fsync" },
87 );
88
89 Ok(())
90 }
91}