1use lib::{
3 ConsumerGroupDetail, Error, ExportedKafkaRecord, KafkaRecord, TopicDetail,
4 kafka::SchemaRegistryClient, search::offset::FromOffset,
5};
6use rdkafka::{
7 Offset, TopicPartitionList,
8 consumer::{BaseConsumer, Consumer, StreamConsumer},
9};
10use thousands::Separable;
11use tracing::{info, warn};
12
13use std::{collections::HashSet, fs, time::Duration};
14
15use itertools::Itertools;
16
17use crate::{
18 configuration::{Configuration, ConsumerConfig, InternalConfig, YozefuConfig},
19 search::{Search, ValidSearchQuery},
20};
21
22#[derive(Debug, Clone)]
24pub struct App {
25 pub cluster: String,
26 pub config: InternalConfig,
27 pub search_query: ValidSearchQuery,
28 }
30
31impl App {
32 pub fn new(
33 cluster: String,
34 config: InternalConfig,
35 search_query: ValidSearchQuery,
36 ) -> Self {
38 Self {
39 cluster,
40 config,
41 search_query,
42 }
44 }
45
46 pub fn schema_registry(&self) -> Option<SchemaRegistryClient> {
47 match self.config.schema_registry_config_of(&self.cluster) {
48 Some(config) => Some(SchemaRegistryClient::new(config.url, &config.headers)),
49 None => None,
50 }
51 }
52
53 pub fn create_consumer(&self, topics: &Vec<String>) -> Result<StreamConsumer, Error> {
55 let offset = self.search_query.offset().unwrap_or(FromOffset::End);
56 match offset {
57 FromOffset::Beginning => self.assign_partitions(topics, Offset::Beginning),
58 FromOffset::End => self.assign_partitions(topics, Offset::End),
59 FromOffset::Offset(o) => self.assign_partitions(topics, Offset::Offset(o)),
60 FromOffset::OffsetTail(o) => self.assign_partitions(topics, Offset::OffsetTail(o)),
61 FromOffset::Timestamp(timestamp) => {
62 let consumer: StreamConsumer = self.config.create_kafka_consumer()?;
63 let mut tp = TopicPartitionList::new();
64 for t in topics {
65 let metadata = consumer.fetch_metadata(Some(t), Duration::from_secs(10))?;
66 for m in metadata.topics() {
67 for p in m.partitions() {
68 tp.add_partition(m.name(), p.id());
69 }
70 }
71 }
72 tp.set_all_offsets(Offset::Offset(timestamp))?;
73 let tt = consumer.offsets_for_times(tp, Duration::from_secs(60))?;
74 consumer.assign(&tt)?;
75 Ok(consumer)
76 }
77 }
78 }
79
80 pub fn consumer_config(&self) -> ConsumerConfig {
81 self.config.consumer_config(&self.cluster)
82 }
83
84 pub fn export_record(&self, record: &KafkaRecord) -> Result<(), Error> {
87 let output_file = self.config.output_file();
88 fs::create_dir_all(output_file.parent().unwrap())?;
89 let content = fs::read_to_string(output_file).unwrap_or("[]".to_string());
90 let mut exported_records: Vec<ExportedKafkaRecord> = serde_json::from_str(&content)?;
91
92 let mut exported_record_kafka: ExportedKafkaRecord = record.into();
93 exported_record_kafka.set_search_query(self.search_query.query());
94 exported_records.push(exported_record_kafka);
95 exported_records.sort_by(|a, b| {
96 a.record
97 .timestamp
98 .cmp(&b.record.timestamp)
99 .then(a.record.offset.cmp(&b.record.offset))
100 });
101 exported_records.dedup();
102 for i in 1..exported_records.len() {
103 let first_ts = exported_records.first().unwrap().record.timestamp;
104 let previous_ts = exported_records.get(i - 1).unwrap().record.timestamp;
105 let current = exported_records.get_mut(i).unwrap();
106 current.compute_deltas_ms(first_ts, previous_ts);
107 }
108
109 fs::write(
110 output_file,
111 serde_json::to_string_pretty(&exported_records)?,
112 )?;
113 info!(
114 "A record has been exported into file '{}'",
115 output_file.display()
116 );
117 Ok(())
118 }
119
120 pub fn estimate_number_of_records_to_read(
123 &self,
124 topic_partition_list: &TopicPartitionList,
125 ) -> Result<i64, Error> {
126 let client: StreamConsumer = self.create_assigned_consumer()?;
127 let mut count = 0;
128 for t in topic_partition_list.elements() {
129 let watermarks: (i64, i64) =
131 match client.fetch_watermarks(t.topic(), t.partition(), Duration::from_secs(10)) {
132 Ok(i) => i,
133 Err(e) => {
134 warn!(
135 "I was not able to fetch watermarks of topic '{}', partition {}: {}",
136 t.partition(),
137 t.topic(),
138 e
139 );
140 (0, 0)
141 }
142 };
143 count += match t.offset() {
144 Offset::Beginning => watermarks.1 - watermarks.0,
145 Offset::End => 0,
146 Offset::Stored => 1,
147 Offset::Invalid => 1,
148 Offset::Offset(o) => watermarks.1 - o,
149 Offset::OffsetTail(o) => o,
150 }
151 }
152
153 info!(
154 "{} records are about to be consumed on the following topic partitions: [{}]",
155 count.separate_with_underscores(),
156 topic_partition_list
157 .elements()
158 .iter()
159 .map(|e| format!("{}-{}", e.topic(), e.partition()))
160 .join(", ")
161 );
162 Ok(count)
163 }
164
165 fn create_assigned_consumer(&self) -> Result<StreamConsumer, Error> {
166 self.config.create_kafka_consumer()
167 }
168
169 fn assign_partitions(
171 &self,
172 topics: &Vec<String>,
173 offset: Offset,
174 ) -> Result<StreamConsumer, Error> {
175 let consumer = self.create_assigned_consumer()?;
176 let mut assignments = TopicPartitionList::new();
177 for topic in topics {
178 let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
179 for t in metadata.topics() {
180 for p in t.partitions() {
181 assignments.add_partition_offset(topic, p.id(), offset)?;
182 }
183 }
184 }
185 consumer.assign(&assignments)?;
186 info!("New Consumer created, about to consume {topics:?}");
187 Ok(consumer)
188 }
189
190 pub fn topic_details(&self, topics: HashSet<String>) -> Result<Vec<TopicDetail>, Error> {
193 let mut results = vec![];
194 for topic in topics {
195 let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
196 let metadata = consumer.fetch_metadata(Some(&topic), Duration::from_secs(10))?;
197 let metadata = metadata.topics().first().unwrap();
198 let mut detail = TopicDetail {
199 name: topic.clone(),
200 replicas: metadata.partitions().first().unwrap().replicas().len(),
201 partitions: metadata.partitions().len(),
202 consumer_groups: vec![],
203 count: self.count_records_in_topic(&topic)?,
204 };
205 let mut consumer_groups = vec![];
206 let metadata = consumer.fetch_group_list(None, Duration::from_secs(10))?;
207 for g in metadata.groups() {
208 consumer_groups.push(ConsumerGroupDetail {
209 name: g.name().to_string(),
210 members: vec![], state: g.state().parse()?,
212 });
213 }
214
215 detail.consumer_groups = consumer_groups;
216 results.push(detail);
217 }
218
219 Ok(results)
220 }
221
222 pub fn count_records_in_topic(&self, topic: &str) -> Result<i64, Error> {
223 let mut count = 0;
224 let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
225 let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
226 let metadata_topic = metadata.topics().first();
227 if metadata_topic.is_none() {
228 return Ok(0);
229 }
230
231 let metadata_topic = metadata_topic.unwrap();
232 for partition in metadata_topic.partitions() {
233 let watermarks =
234 consumer.fetch_watermarks(topic, partition.id(), Duration::from_secs(10))?;
235 count += watermarks.1 - watermarks.0;
236 }
237
238 Ok(count)
239 }
240
241 pub fn list_topics(&self) -> Result<Vec<String>, Error> {
243 let consumer: StreamConsumer = self.create_assigned_consumer()?;
244 let metadata = consumer.fetch_metadata(None, Duration::from_secs(10))?;
245 let topics = metadata
246 .topics()
247 .iter()
248 .map(|t| t.name().to_string())
249 .collect_vec();
250 Ok(topics)
251 }
252
253 pub fn list_topics_from_client(yozefu_config: &YozefuConfig) -> Result<Vec<String>, Error> {
304 let consumer: StreamConsumer = yozefu_config.create_kafka_consumer()?;
305 let metadata = consumer.fetch_metadata(None, Duration::from_secs(3))?;
306 let topics = metadata
307 .topics()
308 .iter()
309 .map(|t| t.name().to_string())
310 .collect_vec();
311 Ok(topics)
312 }
313}
314
315