1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#![feature(generators)]


mod file_fetch;
mod file_produce;



pub use file_fetch::FilePartitionResponse;
pub use file_fetch::FileFetchResponse;
pub use file_fetch::FileTopicResponse;
pub use file_fetch::KfFileFetchRequest;
pub use file_produce::FileProduceRequest;
pub use file_produce::FileTopicRequest;
pub use file_produce::FilePartitionRequest;
pub use file_fetch::KfFileRecordSet;

use std::io::Error as IoError;

use log::trace;

use flv_future_aio::bytes::Bytes;
use flv_future_aio::bytes::BytesMut;
use flv_future_aio::fs::AsyncFileSlice;
use kf_protocol_core::Version;
use kf_protocol_api::Request;
use kf_protocol_api::RequestMessage;
use kf_protocol_api::ResponseMessage;
use kf_protocol_core::Encoder;


pub enum StoreValue {
    Bytes(Bytes),
    FileSlice(AsyncFileSlice),
}

impl std::fmt::Debug for StoreValue {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            StoreValue::Bytes(bytes) => write!(f, "StoreValue:Bytes with len: {}", bytes.len()),
            StoreValue::FileSlice(slice) => write!(f, "StoreValue:FileSlice: {:#?}", slice),
        }
    }
}

pub trait FileWrite: Encoder {
    fn file_encode(
        &self,
        src: &mut BytesMut,
        _data: &mut Vec<StoreValue>,
        version: Version,
    ) -> Result<(), IoError> {
        self.encode(src, version)
    }
}

impl<M> FileWrite for Vec<M>
where
    M: FileWrite,
{
    fn file_encode(
        &self,
        src: &mut BytesMut,
        data: &mut Vec<StoreValue>,
        version: Version,
    ) -> Result<(), IoError> {
        let len: i32 = self.len() as i32;
        len.encode(src, version)?;
        for v in self {
            v.file_encode(src, data, version)?;
        }
        Ok(())
    }
}

/// This is same as encoding in the ResponseMessage but can encode async file slice
impl<P> FileWrite for ResponseMessage<P>
where
    P: FileWrite + Default,
{
    fn file_encode(
        &self,
        dest: &mut BytesMut,
        data: &mut Vec<StoreValue>,
        version: Version,
    ) -> Result<(), IoError> {

        let len = self.write_size(version) as i32;
        log::debug!("encoding file write response: {} version: {}, len: {}", std::any::type_name::<P>(),version, len);
        len.encode(dest, version)?;

        trace!("encoding response correlation  id: {}", self.correlation_id);
        self.correlation_id.encode(dest, version)?;

        trace!("encoding response");
        self.response.file_encode(dest, data, version)?;
        Ok(())
    }
}

impl<R> FileWrite for RequestMessage<R>
where
    R: FileWrite + Default + Request,
{
    fn file_encode(
        &self,
        dest: &mut BytesMut,
        data: &mut Vec<StoreValue>,
        version: Version,
    ) -> Result<(), IoError> {
        trace!("file encoding response message");
        let len = self.write_size(version) as i32;
        trace!("file encoding response len: {}", len);
        len.encode(dest, version)?;

        trace!("file encoding header");
        self.header.encode(dest, version)?;

        trace!("encoding response");
        self.request.file_encode(dest, data, version)?;
        Ok(())
    }
}