1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use clap::{ArgGroup, Parser};
use rdkafka::ClientConfig;
use std::net::{SocketAddr, ToSocketAddrs};
use crate::constants::{DEFAULT_HTTP_HOST, DEFAULT_HTTP_HOST_PORT, DEFAULT_HTTP_PORT};
/// Command Line Interface, defined via the declarative,
/// `derive` based functionality of the `clap` crate.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
#[command(group(
ArgGroup::new("logging_flags")
.required(false)
.multiple(false)
.args(["verbose", "quiet"]),
))]
pub struct Cli {
// ------------------------------------------------------------------ Admin Client configuration
/// Initial Kafka Brokers to connect to (format: 'HOST:PORT,...').
///
/// Equivalent to '--config=bootstrap.servers:host:port,...'.
#[arg(short, long = "brokers", value_name = "BOOTSTRAP_BROKERS")]
pub bootstrap_brokers: String,
/// Client identifier used by the internal Kafka (Admin) Client.
///
/// Equivalent to '--config=client.id:my-client-id'.
#[arg(long = "client-id", value_name = "CLIENT_ID", default_value = env!("CARGO_PKG_NAME"))]
pub client_id: String,
/// Additional configuration used by the internal Kafka (Admin) Client (format: 'CONF_KEY:CONF_VAL').
///
/// To set multiple configurations keys, use this argument multiple times.
/// See: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
#[arg(
long = "kafka-conf",
value_name = "CONF_KEY:CONF_VAL",
value_parser = kv_clap_value_parser,
verbatim_doc_comment
)]
pub kafka_config: Vec<KVPair>,
/// Override identifier of the monitored Kafka Cluster.
///
/// If set, it replaces the value `cluster.id` from the Brokers' configuration.
/// This can be useful when `cluster.id` is not actually set.
#[arg(long = "cluster-id", value_name = "CLUSTER_ID")]
pub cluster_id: Option<String>,
/// For each Topic Partition, how much history of offsets to track in memory.
///
/// Offsets data points are collected every 500ms, on average: so, on average,
/// 30 minutes of data points is 3600 offsets, assuming partition offsets are
/// regularly produced to.
///
/// Once this limit is reached, the oldest data points are discarded, realising
/// a "moving window" of offsets history.
#[arg(long = "history", value_name = "SIZE", default_value = "3600", verbatim_doc_comment)]
pub offsets_history: usize,
/// Address to listen on (i.e. bind to), to receive HTTP requests.
///
/// In addition to the canonical 'HOST:PORT' format, it also allows for:
///
/// * ':PORT' / 'PORT' (assumes default 'HOST')
/// * 'HOST:' / 'HOST' (assumes default 'PORT')
/// * ':' (fallback on default)
#[arg(
short,
long = "listen-on",
value_name = "HOST:PORT",
value_parser = socketaddr_value_parser,
default_value = DEFAULT_HTTP_HOST_PORT,
verbatim_doc_comment
)]
pub listen_on: SocketAddr,
/// Verbose logging.
///
/// * none = 'WARN'
/// * '-v' = 'INFO'
/// * '-vv' = 'DEBUG'
/// * '-vvv' = 'TRACE'
///
/// Alternatively, set environment variable 'KOMMITTED_LOG=(ERROR|WARN|INFO|DEBUG|TRACE|OFF)'.
#[arg(short, long, action = clap::ArgAction::Count, verbatim_doc_comment)]
pub verbose: u8,
/// Quiet logging.
///
/// * none = 'WARN'
/// * '-q' = 'ERROR'
/// * '-qq' = 'OFF'
///
/// Alternatively, set environment variable 'KOMMITTED_LOG=(ERROR|WARN|INFO|DEBUG|TRACE|OFF)'.
#[arg(short, long, action = clap::ArgAction::Count, verbatim_doc_comment)]
pub quiet: u8,
}
impl Cli {
pub fn parse_and_validate() -> Self {
// TODO Implement a proper validation
Self::parse()
}
pub fn verbosity_level(&self) -> i8 {
self.verbose as i8 - self.quiet as i8
}
pub fn build_client_config(&self) -> ClientConfig {
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", self.bootstrap_brokers.clone())
.set("client.id", self.client_id.clone());
for cfg in &self.kafka_config {
config.set(cfg.0.clone(), cfg.1.clone());
}
trace!("Created:\n{:#?}", config);
config
}
}
/// A simple (key,value) pair of `String`s, useful to be parsed from arguments via [`kv_clap_value_parser`].
pub type KVPair = (String, String);
/// To be used as [`clap::value_parser`] function to create [`KVPair`] values.
fn kv_clap_value_parser(kv: &str) -> Result<KVPair, String> {
let (k, v) = match kv.split_once(':') {
None => {
return Err("Should have 'K:V' format".to_string());
},
Some((k, v)) => (k, v),
};
Ok((k.to_string(), v.to_string()))
}
/// To be used as [`clap::value_parser`] function to create a [`SocketAddr`].
fn socketaddr_value_parser(socket_addr: &str) -> Result<SocketAddr, String> {
let socket_addr_normalized = if socket_addr.is_empty() || socket_addr == ":" {
DEFAULT_HTTP_HOST_PORT.to_string()
} else if socket_addr.starts_with(':') {
format!("{DEFAULT_HTTP_HOST}{socket_addr}")
} else if socket_addr.ends_with(':') {
format!("{socket_addr}{DEFAULT_HTTP_PORT}")
} else if socket_addr.parse::<u16>().is_ok() {
format!("{DEFAULT_HTTP_HOST}:{socket_addr}")
} else if socket_addr.contains(':') {
socket_addr.to_string()
} else {
format!("{socket_addr}:{DEFAULT_HTTP_PORT}")
};
match socket_addr_normalized.to_socket_addrs() {
Ok(mut iter) => iter.next().ok_or(format!("Unable to parse address '{socket_addr}'")),
Err(e) => Err(format!("Failed to parse address '{socket_addr}': {e}")),
}
}