kf_protocol_message/
produce_handler.rs

1use std::fmt::Debug;
2
3use kf_protocol::Decoder;
4use kf_protocol::Encoder;
5
6use kf_protocol_api::RecordSet;
7
8use crate::produce::{KfProduceResponse, KfProduceRequest};
9use crate::produce::TopicProduceData;
10use crate::produce::{PartitionProduceData, PartitionProduceResponse};
11
12pub type DefaultKfProduceRequest = KfProduceRequest<RecordSet>;
13pub type DefaultKfTopicRequest = TopicProduceData<RecordSet>;
14pub type DefaultKfPartitionRequest = PartitionProduceData<RecordSet>;
15
16// -----------------------------------
17// Implementation - KfProduceRequest
18// -----------------------------------
19
20impl <R>KfProduceRequest<R> where R: Encoder + Decoder + Debug {
21
22    /// Find partition in request
23    pub fn find_partition_request(&self, topic: &str, partition: i32) -> Option<&PartitionProduceData<R>> {
24        if let Some(request) = self.topics.iter().find(|request| request.name == topic) {
25             request.partitions.iter().find( |part_request| part_request.partition_index == partition)
26        } else {
27            None
28        }
29    }
30}
31
32// -----------------------------------
33// Implementation - KfProduceResponse
34// -----------------------------------
35
36impl KfProduceResponse {
37
38    /// Find partition in Response
39    pub fn find_partition_response(&self, topic: &str, partition: i32) -> Option<&PartitionProduceResponse> {
40
41        if let Some(response) = self.responses.iter().find(|response| response.name == topic) {
42             response.partitions.iter().find( |part_response| part_response.partition_index == partition)
43        } else {
44            None
45        }
46    }
47}