1use lib::{
3 ConsumerGroupDetail, Error, ExportedKafkaRecord, KafkaRecord, TopicDetail,
4 kafka::SchemaRegistryClient, search::offset::FromOffset,
5};
6use log::{info, warn};
7use rdkafka::{
8 Offset, TopicPartitionList,
9 consumer::{BaseConsumer, Consumer, StreamConsumer},
10};
11use thousands::Separable;
12
13use std::{collections::HashSet, fs, time::Duration};
14
15use itertools::Itertools;
16
17use crate::{
18 configuration::{Configuration, InternalConfig, YozefuConfig},
19 search::{Search, ValidSearchQuery},
20};
21
22#[derive(Debug, Clone)]
24pub struct App {
25 pub cluster: String,
26 pub config: InternalConfig,
27 pub search_query: ValidSearchQuery,
28 }
30
31impl App {
32 pub fn new(
33 cluster: String,
34 config: InternalConfig,
35 search_query: ValidSearchQuery,
36 ) -> Self {
38 Self {
39 cluster,
40 config,
41 search_query,
42 }
44 }
45
46 pub fn schema_registry(&self) -> Option<SchemaRegistryClient> {
47 match self.config.schema_registry_config_of(&self.cluster) {
48 Some(config) => Some(SchemaRegistryClient::new(config.url, &config.headers)),
49 None => None,
50 }
51 }
52
53 pub fn create_consumer(&self, topics: &Vec<String>) -> Result<StreamConsumer, Error> {
55 let offset = self.search_query.offset().unwrap_or(FromOffset::End);
56 match offset {
57 FromOffset::Beginning => self.assign_partitions(topics, Offset::Beginning),
58 FromOffset::End => self.assign_partitions(topics, Offset::End),
59 FromOffset::Offset(o) => self.assign_partitions(topics, Offset::Offset(o)),
60 FromOffset::OffsetTail(o) => self.assign_partitions(topics, Offset::OffsetTail(o)),
61 FromOffset::Timestamp(timestamp) => {
62 let consumer: StreamConsumer = self.config.create_kafka_consumer()?;
63 let mut tp = TopicPartitionList::new();
64 for t in topics {
65 let metadata = consumer.fetch_metadata(Some(t), Duration::from_secs(10))?;
66 for m in metadata.topics() {
67 for p in m.partitions() {
68 tp.add_partition(m.name(), p.id());
69 }
70 }
71 }
72 tp.set_all_offsets(Offset::Offset(timestamp))?;
73 let tt = consumer.offsets_for_times(tp, Duration::from_secs(60))?;
74 consumer.assign(&tt)?;
75 Ok(consumer)
76 }
77 }
78 }
79
80 pub fn export_record(&self, record: &KafkaRecord) -> Result<(), Error> {
83 let output_file = self.config.output_file();
84 fs::create_dir_all(output_file.parent().unwrap())?;
85 let content = fs::read_to_string(output_file).unwrap_or("[]".to_string());
86 let mut exported_records: Vec<ExportedKafkaRecord> = serde_json::from_str(&content)?;
87
88 let mut exported_record_kafka: ExportedKafkaRecord = record.into();
89 exported_record_kafka.set_search_query(self.search_query.query());
90 exported_records.push(exported_record_kafka);
91 exported_records.sort_by(|a, b| {
92 a.record
93 .timestamp
94 .cmp(&b.record.timestamp)
95 .then(a.record.offset.cmp(&b.record.offset))
96 });
97 exported_records.dedup();
98 for i in 1..exported_records.len() {
99 let first_ts = exported_records.first().unwrap().record.timestamp;
100 let previous_ts = exported_records.get(i - 1).unwrap().record.timestamp;
101 let current = exported_records.get_mut(i).unwrap();
102 current.compute_deltas_ms(first_ts, previous_ts);
103 }
104
105 fs::write(
106 output_file,
107 serde_json::to_string_pretty(&exported_records)?,
108 )?;
109 info!(
110 "A record has been exported into file '{}'",
111 output_file.display()
112 );
113 Ok(())
114 }
115
116 pub fn estimate_number_of_records_to_read(
119 &self,
120 topic_partition_list: TopicPartitionList,
121 ) -> Result<i64, Error> {
122 let client: StreamConsumer = self.create_assigned_consumer()?;
123 let mut count = 0;
124 for t in topic_partition_list.elements() {
125 let watermarks: (i64, i64) =
127 match client.fetch_watermarks(t.topic(), t.partition(), Duration::from_secs(10)) {
128 Ok(i) => i,
129 Err(e) => {
130 warn!(
131 "I was not able to fetch watermarks of topic '{}', partition {}: {}",
132 t.partition(),
133 t.topic(),
134 e
135 );
136 (0, 0)
137 }
138 };
139 count += match t.offset() {
140 Offset::Beginning => watermarks.1 - watermarks.0,
141 Offset::End => 0,
142 Offset::Stored => 1,
143 Offset::Invalid => 1,
144 Offset::Offset(o) => watermarks.1 - o,
145 Offset::OffsetTail(o) => o,
146 }
147 }
148
149 info!(
150 "{} records are about to be consumed on the following topic partitions: [{}]",
151 count.separate_with_underscores(),
152 topic_partition_list
153 .elements()
154 .iter()
155 .map(|e| format!("{}-{}", e.topic(), e.partition()))
156 .join(", ")
157 );
158 Ok(count)
159 }
160
161 fn create_assigned_consumer(&self) -> Result<StreamConsumer, Error> {
162 self.config.create_kafka_consumer()
163 }
164
165 fn assign_partitions(
167 &self,
168 topics: &Vec<String>,
169 offset: Offset,
170 ) -> Result<StreamConsumer, Error> {
171 let consumer = self.create_assigned_consumer()?;
172 let mut assignments = TopicPartitionList::new();
173 for topic in topics {
174 let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
175 for t in metadata.topics() {
176 for p in t.partitions() {
177 assignments.add_partition_offset(topic, p.id(), offset)?;
178 }
179 }
180 }
181 consumer.assign(&assignments)?;
182 info!("New Consumer created, about to consume {:?}", topics);
183 Ok(consumer)
184 }
185
186 pub fn topic_details(&self, topics: HashSet<String>) -> Result<Vec<TopicDetail>, Error> {
189 let mut results = vec![];
190 for topic in topics {
191 let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
192 let metadata = consumer.fetch_metadata(Some(&topic), Duration::from_secs(10))?;
193 let metadata = metadata.topics().first().unwrap();
194 let mut detail = TopicDetail {
195 name: topic.clone(),
196 replicas: metadata.partitions().first().unwrap().replicas().len(),
197 partitions: metadata.partitions().len(),
198 consumer_groups: vec![],
199 count: self.count_records_in_topic(&topic)?,
200 };
201 let mut consumer_groups = vec![];
202 let metadata = consumer.fetch_group_list(None, Duration::from_secs(10))?;
203 for g in metadata.groups() {
204 consumer_groups.push(ConsumerGroupDetail {
205 name: g.name().to_string(),
206 members: vec![], state: g.state().parse()?,
208 });
209 }
210
211 detail.consumer_groups = consumer_groups;
212 results.push(detail);
213 }
214
215 Ok(results)
216 }
217
218 pub fn count_records_in_topic(&self, topic: &str) -> Result<i64, Error> {
219 let mut count = 0;
220 let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
221 let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
222 let metadata_topic = metadata.topics().first();
223 if metadata_topic.is_none() {
224 return Ok(0);
225 }
226
227 let metadata_topic = metadata_topic.unwrap();
228 for partition in metadata_topic.partitions() {
229 let watermarks =
230 consumer.fetch_watermarks(topic, partition.id(), Duration::from_secs(10))?;
231 count += watermarks.1 - watermarks.0;
232 }
233
234 Ok(count)
235 }
236
237 pub fn list_topics(&self) -> Result<Vec<String>, Error> {
239 let consumer: StreamConsumer = self.create_assigned_consumer()?;
240 let metadata = consumer.fetch_metadata(None, Duration::from_secs(10))?;
241 let topics = metadata
242 .topics()
243 .iter()
244 .map(|t| t.name().to_string())
245 .collect_vec();
246 Ok(topics)
247 }
248
249 pub fn list_topics_from_client(yozefu_config: &YozefuConfig) -> Result<Vec<String>, Error> {
300 let consumer: StreamConsumer = yozefu_config.create_kafka_consumer()?;
301 let metadata = consumer.fetch_metadata(None, Duration::from_secs(3))?;
302 let topics = metadata
303 .topics()
304 .iter()
305 .map(|t| t.name().to_string())
306 .collect_vec();
307 Ok(topics)
308 }
309}
310
311