1use lib::{
3 ConsumerGroupDetail, Error, ExportedKafkaRecord, KafkaRecord, TopicConfig, TopicDetail,
4 kafka::SchemaRegistryClient, search::offset::FromOffset,
5};
6use rdkafka::{
7 Offset, TopicPartitionList,
8 consumer::{BaseConsumer, Consumer, StreamConsumer},
9};
10use thousands::Separable;
11use tracing::{info, warn};
12
13use std::{collections::HashSet, fs, time::Duration};
14
15use itertools::Itertools;
16
17use crate::{
18 AdminClient,
19 configuration::{Configuration, ConsumerConfig, InternalConfig, YozefuConfig},
20 search::{Search, ValidSearchQuery},
21};
22
23#[derive(Debug, Clone)]
25pub struct App {
26 pub cluster: String,
27 pub config: InternalConfig,
28 pub search_query: ValidSearchQuery,
29 }
31
32impl App {
33 pub fn new(
34 cluster: String,
35 config: InternalConfig,
36 search_query: ValidSearchQuery,
37 ) -> Self {
39 Self {
40 cluster,
41 config,
42 search_query,
43 }
45 }
46
47 pub fn schema_registry(&self) -> Option<SchemaRegistryClient> {
48 match self.config.schema_registry_config_of(&self.cluster) {
49 Some(config) => Some(SchemaRegistryClient::new(config.url, &config.headers)),
50 None => None,
51 }
52 }
53
54 pub fn create_consumer(&self, topics: &Vec<String>) -> Result<StreamConsumer, Error> {
56 let offset = self.search_query.offset().unwrap_or(FromOffset::End);
57 match offset {
58 FromOffset::Beginning => self.assign_partitions(topics, Offset::Beginning),
59 FromOffset::End => self.assign_partitions(topics, Offset::End),
60 FromOffset::Offset(o) => self.assign_partitions(topics, Offset::Offset(o)),
61 FromOffset::OffsetTail(o) => self.assign_partitions(topics, Offset::OffsetTail(o)),
62 FromOffset::Timestamp(timestamp) => {
63 let consumer: StreamConsumer = self.config.create_kafka_consumer()?;
64 let mut tp = TopicPartitionList::new();
65 for t in topics {
66 let metadata = consumer.fetch_metadata(Some(t), Duration::from_secs(10))?;
67 for m in metadata.topics() {
68 for p in m.partitions() {
69 tp.add_partition(m.name(), p.id());
70 }
71 }
72 }
73 tp.set_all_offsets(Offset::Offset(timestamp))?;
74 let tt = consumer.offsets_for_times(tp, Duration::from_secs(60))?;
75 consumer.assign(&tt)?;
76 Ok(consumer)
77 }
78 }
79 }
80
81 pub fn consumer_config(&self) -> ConsumerConfig {
82 self.config.consumer_config(&self.cluster)
83 }
84
85 pub fn export_record(&self, record: &KafkaRecord) -> Result<(), Error> {
88 let output_file = self.config.output_file();
89 fs::create_dir_all(output_file.parent().unwrap())?;
90 let content = fs::read_to_string(output_file).unwrap_or("[]".to_string());
91 let mut exported_records: Vec<ExportedKafkaRecord> = serde_json::from_str(&content)?;
92
93 let mut exported_record_kafka: ExportedKafkaRecord = record.into();
94 exported_record_kafka.set_search_query(self.search_query.query());
95 exported_records.push(exported_record_kafka);
96 exported_records.sort_by(|a, b| {
97 a.record
98 .timestamp
99 .cmp(&b.record.timestamp)
100 .then(a.record.offset.cmp(&b.record.offset))
101 });
102 exported_records.dedup();
103 for i in 1..exported_records.len() {
104 let first_ts = exported_records.first().unwrap().record.timestamp;
105 let previous_ts = exported_records.get(i - 1).unwrap().record.timestamp;
106 let current = exported_records.get_mut(i).unwrap();
107 current.compute_deltas_ms(first_ts, previous_ts);
108 }
109
110 fs::write(
111 output_file,
112 serde_json::to_string_pretty(&exported_records)?,
113 )?;
114 info!(
115 "A record has been exported into file '{}'",
116 output_file.display()
117 );
118 Ok(())
119 }
120
121 pub fn estimate_number_of_records_to_read(
124 &self,
125 topic_partition_list: &TopicPartitionList,
126 ) -> Result<i64, Error> {
127 let client: StreamConsumer = self.create_assigned_consumer()?;
128 let mut count = 0;
129 for t in topic_partition_list.elements() {
130 let watermarks: (i64, i64) =
132 match client.fetch_watermarks(t.topic(), t.partition(), Duration::from_secs(10)) {
133 Ok(i) => i,
134 Err(e) => {
135 warn!(
136 "I was not able to fetch watermarks of topic '{}', partition {}: {}",
137 t.partition(),
138 t.topic(),
139 e
140 );
141 (0, 0)
142 }
143 };
144 count += match t.offset() {
145 Offset::Beginning => watermarks.1 - watermarks.0,
146 Offset::End => 0,
147 Offset::Stored => 1,
148 Offset::Invalid => 1,
149 Offset::Offset(o) => watermarks.1 - o,
150 Offset::OffsetTail(o) => o,
151 }
152 }
153
154 info!(
155 "{} records are about to be consumed from the following topic partitions: [{}]",
156 count.separate_with_underscores(),
157 topic_partition_list
158 .elements()
159 .iter()
160 .map(|e| format!("{}-{}", e.topic(), e.partition()))
161 .join(", ")
162 );
163 Ok(count)
164 }
165
166 fn create_assigned_consumer(&self) -> Result<StreamConsumer, Error> {
167 self.config.create_kafka_consumer()
168 }
169
170 fn assign_partitions(
172 &self,
173 topics: &Vec<String>,
174 offset: Offset,
175 ) -> Result<StreamConsumer, Error> {
176 let consumer = self.create_assigned_consumer()?;
177 let mut assignments = TopicPartitionList::new();
178 for topic in topics {
179 let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
180 for t in metadata.topics() {
181 for p in t.partitions() {
182 assignments.add_partition_offset(topic, p.id(), offset)?;
183 }
184 }
185 }
186 consumer.assign(&assignments)?;
187 info!("New Consumer created, about to consume {topics:?}");
188 Ok(consumer)
189 }
190
191 pub fn topic_details(&self, topics: HashSet<String>) -> Result<Vec<TopicDetail>, Error> {
194 let mut results = vec![];
195 for topic in topics {
196 let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
197 let metadata = consumer.fetch_metadata(Some(&topic), Duration::from_secs(10))?;
198 let metadata = metadata.topics().first().unwrap();
199 let mut detail = TopicDetail {
200 name: topic.clone(),
201 replicas: metadata.partitions().first().unwrap().replicas().len(),
202 partitions: metadata.partitions().len(),
203 consumer_groups: vec![],
204 count: self.count_records_in_topic(&topic)?,
205 config: None,
206 };
207 let mut consumer_groups = vec![];
208 let metadata = consumer.fetch_group_list(None, Duration::from_secs(10))?;
209 for g in metadata.groups() {
210 consumer_groups.push(ConsumerGroupDetail {
211 name: g.name().to_string(),
212 members: vec![], state: g.state().parse()?,
214 });
215 }
216 detail.consumer_groups = consumer_groups;
217 results.push(detail);
218 }
219
220 Ok(results)
221 }
222
223 pub async fn topic_config_of(&self, topic: &str) -> Result<Option<TopicConfig>, Error> {
224 AdminClient::new(self.config.client_config())?
225 .topic_config(topic)
226 .await
227 }
228
229 pub fn count_records_in_topic(&self, topic: &str) -> Result<i64, Error> {
230 let mut count = 0;
231 let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
232 let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
233 let metadata_topic = metadata.topics().first();
234 if metadata_topic.is_none() {
235 return Ok(0);
236 }
237
238 let metadata_topic = metadata_topic.unwrap();
239 for partition in metadata_topic.partitions() {
240 let watermarks =
241 consumer.fetch_watermarks(topic, partition.id(), Duration::from_secs(10))?;
242 count += watermarks.1 - watermarks.0;
243 }
244
245 Ok(count)
246 }
247
248 pub fn list_topics(&self) -> Result<Vec<String>, Error> {
250 let consumer: StreamConsumer = self.create_assigned_consumer()?;
251 let metadata = consumer.fetch_metadata(None, Duration::from_secs(10))?;
252 let topics = metadata
253 .topics()
254 .iter()
255 .map(|t| t.name().to_string())
256 .collect_vec();
257 Ok(topics)
258 }
259
260 pub fn list_topics_from_client(yozefu_config: &YozefuConfig) -> Result<Vec<String>, Error> {
311 let consumer: StreamConsumer = yozefu_config.create_kafka_consumer()?;
312 let metadata = consumer.fetch_metadata(None, Duration::from_secs(3))?;
313 let topics = metadata
314 .topics()
315 .iter()
316 .map(|t| t.name().to_string())
317 .collect_vec();
318 Ok(topics)
319 }
320}
321
322