yozefu_app/
app.rs

1//! This app is both a kafka consumer and a kafka admin client.
2use lib::{
3    ConsumerGroupDetail, Error, ExportedKafkaRecord, KafkaRecord, TopicDetail,
4    kafka::SchemaRegistryClient, search::offset::FromOffset,
5};
6use rdkafka::{
7    Offset, TopicPartitionList,
8    consumer::{BaseConsumer, Consumer, StreamConsumer},
9};
10use thousands::Separable;
11use tracing::{info, warn};
12
13use std::{collections::HashSet, fs, time::Duration};
14
15use itertools::Itertools;
16
17use crate::{
18    configuration::{Configuration, ConsumerConfig, InternalConfig, YozefuConfig},
19    search::{Search, ValidSearchQuery},
20};
21
22/// Struct exposing different functions for consuming kafka records.
23#[derive(Debug, Clone)]
24pub struct App {
25    pub cluster: String,
26    pub config: InternalConfig,
27    pub search_query: ValidSearchQuery,
28    //pub output_file: PathBuf,
29}
30
31impl App {
32    pub fn new(
33        cluster: String,
34        config: InternalConfig,
35        search_query: ValidSearchQuery,
36        //     output_file: PathBuf,
37    ) -> Self {
38        Self {
39            cluster,
40            config,
41            search_query,
42            //    output_file,
43        }
44    }
45
46    pub fn schema_registry(&self) -> Option<SchemaRegistryClient> {
47        match self.config.schema_registry_config_of(&self.cluster) {
48            Some(config) => Some(SchemaRegistryClient::new(config.url, &config.headers)),
49            None => None,
50        }
51    }
52
53    /// Create a kafka consumer
54    pub fn create_consumer(&self, topics: &Vec<String>) -> Result<StreamConsumer, Error> {
55        let offset = self.search_query.offset().unwrap_or(FromOffset::End);
56        match offset {
57            FromOffset::Beginning => self.assign_partitions(topics, Offset::Beginning),
58            FromOffset::End => self.assign_partitions(topics, Offset::End),
59            FromOffset::Offset(o) => self.assign_partitions(topics, Offset::Offset(o)),
60            FromOffset::OffsetTail(o) => self.assign_partitions(topics, Offset::OffsetTail(o)),
61            FromOffset::Timestamp(timestamp) => {
62                let consumer: StreamConsumer = self.config.create_kafka_consumer()?;
63                let mut tp = TopicPartitionList::new();
64                for t in topics {
65                    let metadata = consumer.fetch_metadata(Some(t), Duration::from_secs(10))?;
66                    for m in metadata.topics() {
67                        for p in m.partitions() {
68                            tp.add_partition(m.name(), p.id());
69                        }
70                    }
71                }
72                tp.set_all_offsets(Offset::Offset(timestamp))?;
73                let tt = consumer.offsets_for_times(tp, Duration::from_secs(60))?;
74                consumer.assign(&tt)?;
75                Ok(consumer)
76            }
77        }
78    }
79
80    pub fn consumer_config(&self) -> ConsumerConfig {
81        self.config.consumer_config(&self.cluster)
82    }
83
84    /// Exports a given kafka record to a file.
85    /// The Name of the file is automatically generated at the runtime
86    pub fn export_record(&self, record: &KafkaRecord) -> Result<(), Error> {
87        let output_file = self.config.output_file();
88        fs::create_dir_all(output_file.parent().unwrap())?;
89        let content = fs::read_to_string(output_file).unwrap_or("[]".to_string());
90        let mut exported_records: Vec<ExportedKafkaRecord> = serde_json::from_str(&content)?;
91
92        let mut exported_record_kafka: ExportedKafkaRecord = record.into();
93        exported_record_kafka.set_search_query(self.search_query.query());
94        exported_records.push(exported_record_kafka);
95        exported_records.sort_by(|a, b| {
96            a.record
97                .timestamp
98                .cmp(&b.record.timestamp)
99                .then(a.record.offset.cmp(&b.record.offset))
100        });
101        exported_records.dedup();
102        for i in 1..exported_records.len() {
103            let first_ts = exported_records.first().unwrap().record.timestamp;
104            let previous_ts = exported_records.get(i - 1).unwrap().record.timestamp;
105            let current = exported_records.get_mut(i).unwrap();
106            current.compute_deltas_ms(first_ts, previous_ts);
107        }
108
109        fs::write(
110            output_file,
111            serde_json::to_string_pretty(&exported_records)?,
112        )?;
113        info!(
114            "A record has been exported into file '{}'",
115            output_file.display()
116        );
117        Ok(())
118    }
119
120    /// Calculates an estimate of the number of records that are going to be read.
121    /// This function is used to render a progress bar.
122    pub fn estimate_number_of_records_to_read(
123        &self,
124        topic_partition_list: &TopicPartitionList,
125    ) -> Result<i64, Error> {
126        let client: StreamConsumer = self.create_assigned_consumer()?;
127        let mut count = 0;
128        for t in topic_partition_list.elements() {
129            // this function call be very slow
130            let watermarks: (i64, i64) =
131                match client.fetch_watermarks(t.topic(), t.partition(), Duration::from_secs(10)) {
132                    Ok(i) => i,
133                    Err(e) => {
134                        warn!(
135                            "I was not able to fetch watermarks of topic '{}', partition {}: {}",
136                            t.partition(),
137                            t.topic(),
138                            e
139                        );
140                        (0, 0)
141                    }
142                };
143            count += match t.offset() {
144                Offset::Beginning => watermarks.1 - watermarks.0,
145                Offset::End => 0,
146                Offset::Stored => 1,
147                Offset::Invalid => 1,
148                Offset::Offset(o) => watermarks.1 - o,
149                Offset::OffsetTail(o) => o,
150            }
151        }
152
153        info!(
154            "{} records are about to be consumed on the following topic partitions: [{}]",
155            count.separate_with_underscores(),
156            topic_partition_list
157                .elements()
158                .iter()
159                .map(|e| format!("{}-{}", e.topic(), e.partition()))
160                .join(", ")
161        );
162        Ok(count)
163    }
164
165    fn create_assigned_consumer(&self) -> Result<StreamConsumer, Error> {
166        self.config.create_kafka_consumer()
167    }
168
169    /// Assigns topics to a consumer
170    fn assign_partitions(
171        &self,
172        topics: &Vec<String>,
173        offset: Offset,
174    ) -> Result<StreamConsumer, Error> {
175        let consumer = self.create_assigned_consumer()?;
176        let mut assignments = TopicPartitionList::new();
177        for topic in topics {
178            let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
179            for t in metadata.topics() {
180                for p in t.partitions() {
181                    assignments.add_partition_offset(topic, p.id(), offset)?;
182                }
183            }
184        }
185        consumer.assign(&assignments)?;
186        info!("New Consumer created, about to consume {topics:?}");
187        Ok(consumer)
188    }
189
190    /// Returns the topics details for a given list topics
191    /// This function is not ready yet
192    pub fn topic_details(&self, topics: HashSet<String>) -> Result<Vec<TopicDetail>, Error> {
193        let mut results = vec![];
194        for topic in topics {
195            let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
196            let metadata = consumer.fetch_metadata(Some(&topic), Duration::from_secs(10))?;
197            let metadata = metadata.topics().first().unwrap();
198            let mut detail = TopicDetail {
199                name: topic.clone(),
200                replicas: metadata.partitions().first().unwrap().replicas().len(),
201                partitions: metadata.partitions().len(),
202                consumer_groups: vec![],
203                count: self.count_records_in_topic(&topic)?,
204            };
205            let mut consumer_groups = vec![];
206            let metadata = consumer.fetch_group_list(None, Duration::from_secs(10))?;
207            for g in metadata.groups() {
208                consumer_groups.push(ConsumerGroupDetail {
209                    name: g.name().to_string(),
210                    members: vec![], //Self::parse_members(g, g.members())?,
211                    state: g.state().parse()?,
212                });
213            }
214
215            detail.consumer_groups = consumer_groups;
216            results.push(detail);
217        }
218
219        Ok(results)
220    }
221
222    pub fn count_records_in_topic(&self, topic: &str) -> Result<i64, Error> {
223        let mut count = 0;
224        let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
225        let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
226        let metadata_topic = metadata.topics().first();
227        if metadata_topic.is_none() {
228            return Ok(0);
229        }
230
231        let metadata_topic = metadata_topic.unwrap();
232        for partition in metadata_topic.partitions() {
233            let watermarks =
234                consumer.fetch_watermarks(topic, partition.id(), Duration::from_secs(10))?;
235            count += watermarks.1 - watermarks.0;
236        }
237
238        Ok(count)
239    }
240
241    /// Lists available kafka topics on the cluster.
242    pub fn list_topics(&self) -> Result<Vec<String>, Error> {
243        let consumer: StreamConsumer = self.create_assigned_consumer()?;
244        let metadata = consumer.fetch_metadata(None, Duration::from_secs(10))?;
245        let topics = metadata
246            .topics()
247            .iter()
248            .map(|t| t.name().to_string())
249            .collect_vec();
250        Ok(topics)
251    }
252
253    // TODO https://github.com/fede1024/rust-rdkafka/pull/680
254    //    pub fn parse_members(
255    //        group: &GroupInfo,
256    //        members: &[GroupMemberInfo],
257    //    ) -> Result<Vec<ConsumerGroupMember>, anyhow::Error> {
258    //        return Ok(vec![]);
259    //        let members = members
260    //            .iter()
261    //            .map(|member| {
262    //                let mut assigns = Vec::new();
263    //                if group.protocol_type() == "consumer" {
264    //                    if let Some(assignment) = member.assignment() {
265    //                        let mut payload_rdr = Cursor::new(assignment);
266    //                        assigns = Self::parse_member_assignment(&mut payload_rdr)
267    //                            .expect("Parse member assignment failed");
268    //                    }
269    //                }
270    //                ConsumerGroupMember {
271    //                    member: member.id().to_owned(),
272    //                    start_offset: 0,
273    //                    end_offset: 0,
274    //                    assignments: assigns,
275    //                }
276    //            })
277    //            .collect::<Vec<_>>();
278    //
279    //        Ok(members)
280    //    }
281    //
282    //    fn parse_member_assignment(
283    //        payload_rdr: &mut Cursor<&[u8]>,
284    //    ) -> Result<Vec<MemberAssignment>, anyhow::Error> {
285    //        return Ok(vec![]);
286    //        let _version = payload_rdr.read_i16::<BigEndian>()?;
287    //        let assign_len = payload_rdr.read_i32::<BigEndian>()?;
288    //        let mut assigns = Vec::with_capacity(assign_len as usize);
289    //        for _ in 0..assign_len {
290    //            let topic = read_str(payload_rdr)?.to_owned();
291    //            let partition_len = payload_rdr.read_i32::<BigEndian>()?;
292    //            let mut partitions = Vec::with_capacity(partition_len as usize);
293    //            for _ in 0..partition_len {
294    //                let partition = payload_rdr.read_i32::<BigEndian>()?;
295    //                partitions.push(partition);
296    //            }
297    //            assigns.push(MemberAssignment { topic, partitions })
298    //        }
299    //        Ok(assigns)
300    //    }
301
302    /// Lists available topics on the cluster with a custom kafka client.
303    pub fn list_topics_from_client(yozefu_config: &YozefuConfig) -> Result<Vec<String>, Error> {
304        let consumer: StreamConsumer = yozefu_config.create_kafka_consumer()?;
305        let metadata = consumer.fetch_metadata(None, Duration::from_secs(3))?;
306        let topics = metadata
307            .topics()
308            .iter()
309            .map(|t| t.name().to_string())
310            .collect_vec();
311        Ok(topics)
312    }
313}
314
315//fn read_str<'a>(rdr: &'a mut Cursor<&[u8]>) -> Result<&'a str, Error> {
316//    let len = (rdr.read_i16::<BigEndian>())? as usize;
317//    let pos = rdr.position() as usize;
318//    let slice = str::from_utf8(&rdr.get_ref()[pos..(pos + len)])?;
319//    rdr.consume(len);
320//    Ok(slice)
321//}
322//