fluvio_protocol_api/
api.rs

1use std::default::Default;
2use std::io::Error as IoError;
3use std::io::ErrorKind;
4use std::fs::File;
5use std::io::Cursor;
6use std::path::Path;
7use std::io::Read;
8use std::fmt::Debug;
9use std::fmt;
10use std::convert::TryFrom;
11
12use tracing::debug;
13use tracing::trace;
14
15use crate::core::Decoder;
16use crate::core::Encoder;
17use crate::derive::Decoder;
18use crate::derive::Encoder;
19use crate::core::bytes::Buf;
20
21pub trait Request: Encoder + Decoder + Debug {
22    const API_KEY: u16;
23
24    const DEFAULT_API_VERSION: i16 = 0;
25    const MIN_API_VERSION: i16 = 0;
26    const MAX_API_VERSION: i16 = -1;
27
28    type Response: Encoder + Decoder + Debug;
29}
30
31pub trait ApiMessage: Sized + Default {
32    type ApiKey: Decoder + Debug;
33
34    fn decode_with_header<T>(src: &mut T, header: RequestHeader) -> Result<Self, IoError>
35    where
36        Self: Default + Sized,
37        Self::ApiKey: Sized,
38        T: Buf;
39
40    fn decode_from<T>(src: &mut T) -> Result<Self, IoError>
41    where
42        T: Buf,
43    {
44        let header = RequestHeader::decode_from(src, 0)?;
45        Self::decode_with_header(src, header)
46    }
47
48    fn decode_from_file<P: AsRef<Path>>(file_name: P) -> Result<Self, IoError> {
49        debug!("decoding from file: {:#?}", file_name.as_ref());
50        let mut f = File::open(file_name)?;
51        let mut buffer: [u8; 1000] = [0; 1000];
52
53        f.read_exact(&mut buffer)?;
54
55        let data = buffer.to_vec();
56        let mut src = Cursor::new(&data);
57
58        let mut size: i32 = 0;
59        size.decode(&mut src, 0)?;
60        trace!("decoded request size: {} bytes", size);
61
62        if src.remaining() < size as usize {
63            return Err(IoError::new(
64                ErrorKind::UnexpectedEof,
65                "not enought bytes for request message",
66            ));
67        }
68
69        Self::decode_from(&mut src)
70    }
71}
72
73pub trait ApiKey: Sized + Encoder + Decoder + TryFrom<u16> {}
74
75#[derive(Debug, Encoder, Decoder, Default)]
76pub struct RequestHeader {
77    api_key: u16,
78    api_version: i16,
79    correlation_id: i32,
80    client_id: String,
81}
82
83impl fmt::Display for RequestHeader {
84    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85        write!(f, "api: {} client: {}", self.api_key, self.client_id)
86    }
87}
88
89impl RequestHeader {
90    pub fn new(api_key: u16) -> Self {
91        // TODO: generate random client id
92        Self::new_with_client(api_key, "dummy".to_owned())
93    }
94
95    pub fn new_with_client<T>(api_key: u16, client_id: T) -> Self
96    where
97        T: Into<String>,
98    {
99        RequestHeader {
100            api_key,
101            api_version: 1,
102            correlation_id: 1,
103
104            client_id: client_id.into(),
105        }
106    }
107
108    pub fn api_key(&self) -> u16 {
109        self.api_key
110    }
111
112    pub fn api_version(&self) -> i16 {
113        self.api_version
114    }
115
116    pub fn set_api_version(&mut self, version: i16) -> &mut Self {
117        self.api_version = version;
118        self
119    }
120
121    pub fn correlation_id(&self) -> i32 {
122        self.correlation_id
123    }
124
125    pub fn set_correlation_id(&mut self, id: i32) -> &mut Self {
126        self.correlation_id = id;
127        self
128    }
129
130    pub fn client_id(&self) -> &String {
131        &self.client_id
132    }
133
134    pub fn set_client_id<T>(&mut self, client_id: T) -> &mut Self
135    where
136        T: Into<String>,
137    {
138        self.client_id = client_id.into();
139        self
140    }
141}
142
143impl From<&RequestHeader> for i32 {
144    fn from(header: &RequestHeader) -> i32 {
145        header.correlation_id()
146    }
147}