1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
use std::io::Error as IoError; use log::trace; use flv_future_aio::bytes::BytesMut; use kf_protocol_core::Encoder; use kf_protocol_core::Version; use kf_protocol_message::produce::KfProduceRequest; use kf_protocol_message::produce::TopicProduceData; use kf_protocol_message::produce::PartitionProduceData; use crate::FileWrite; use crate::StoreValue; use super::file_fetch::KfFileRecordSet; pub type FileProduceRequest = KfProduceRequest<KfFileRecordSet>; pub type FileTopicRequest = TopicProduceData<KfFileRecordSet>; pub type FilePartitionRequest = PartitionProduceData<KfFileRecordSet>; impl FileWrite for FileProduceRequest { fn file_encode( &self, src: &mut BytesMut, data: &mut Vec<StoreValue>, version: Version, ) -> Result<(), IoError> { trace!("file encoding produce request"); self.transactional_id.encode(src, version)?; self.acks.encode(src, version)?; self.timeout_ms.encode(src, version)?; self.topics.file_encode(src, data, version)?; Ok(()) } } impl FileWrite for FileTopicRequest { fn file_encode( &self, src: &mut BytesMut, data: &mut Vec<StoreValue>, version: Version, ) -> Result<(), IoError> { trace!("file encoding produce topic request"); self.name.encode(src, version)?; self.partitions.file_encode(src, data, version)?; Ok(()) } } impl FileWrite for FilePartitionRequest { fn file_encode( &self, src: &mut BytesMut, data: &mut Vec<StoreValue>, version: Version, ) -> Result<(), IoError> { trace!("file encoding for partition request"); self.partition_index.encode(src, version)?; self.records.file_encode(src, data, version)?; Ok(()) } }