kf_protocol_message/
produce_handler.rs1use std::fmt::Debug;
2
3use kf_protocol::Decoder;
4use kf_protocol::Encoder;
5
6use kf_protocol_api::RecordSet;
7
8use crate::produce::{KfProduceResponse, KfProduceRequest};
9use crate::produce::TopicProduceData;
10use crate::produce::{PartitionProduceData, PartitionProduceResponse};
11
12pub type DefaultKfProduceRequest = KfProduceRequest<RecordSet>;
13pub type DefaultKfTopicRequest = TopicProduceData<RecordSet>;
14pub type DefaultKfPartitionRequest = PartitionProduceData<RecordSet>;
15
16impl <R>KfProduceRequest<R> where R: Encoder + Decoder + Debug {
21
22 pub fn find_partition_request(&self, topic: &str, partition: i32) -> Option<&PartitionProduceData<R>> {
24 if let Some(request) = self.topics.iter().find(|request| request.name == topic) {
25 request.partitions.iter().find( |part_request| part_request.partition_index == partition)
26 } else {
27 None
28 }
29 }
30}
31
32impl KfProduceResponse {
37
38 pub fn find_partition_response(&self, topic: &str, partition: i32) -> Option<&PartitionProduceResponse> {
40
41 if let Some(response) = self.responses.iter().find(|response| response.name == topic) {
42 response.partitions.iter().find( |part_response| part_response.partition_index == partition)
43 } else {
44 None
45 }
46 }
47}