use std::fmt::Debug;
use std::io::{Error, ErrorKind};
use std::marker::PhantomData;
use std::time::Duration;
use bytes::{Buf, BufMut};
use crate::batch::RawRecords;
use crate::core::Encoder;
use crate::core::Decoder;
use crate::derive::FluvioDefault;
use crate::core::Version;
use crate::api::Request;
use crate::record::RecordSet;
use crate::Isolation;
use super::ProduceResponse;
pub type DefaultProduceRequest = ProduceRequest<RecordSet<RawRecords>>;
pub type DefaultPartitionRequest = PartitionProduceData<RecordSet<RawRecords>>;
pub type DefaultTopicRequest = TopicProduceData<RecordSet<RawRecords>>;
#[derive(FluvioDefault, Debug)]
pub struct ProduceRequest<R>
where
R: Encoder + Decoder + Default + Debug,
{
#[fluvio(min_version = 3)]
pub transactional_id: Option<String>,
pub isolation: Isolation,
pub timeout: Duration,
pub topics: Vec<TopicProduceData<R>>,
pub data: PhantomData<R>,
}
impl<R> Request for ProduceRequest<R>
where
R: Debug + Decoder + Encoder,
{
const API_KEY: u16 = 0;
const MIN_API_VERSION: i16 = 0;
const MAX_API_VERSION: i16 = 7;
const DEFAULT_API_VERSION: i16 = 7;
type Response = ProduceResponse;
}
#[derive(Encoder, Decoder, FluvioDefault, Debug)]
pub struct TopicProduceData<R>
where
R: Encoder + Decoder + Default + Debug,
{
pub name: String,
pub partitions: Vec<PartitionProduceData<R>>,
pub data: PhantomData<R>,
}
#[derive(Encoder, Decoder, FluvioDefault, Debug)]
pub struct PartitionProduceData<R>
where
R: Encoder + Decoder + Default + Debug,
{
pub partition_index: i32,
pub records: R,
}
impl<R> Encoder for ProduceRequest<R>
where
R: Encoder + Decoder + Default + Debug,
{
fn write_size(&self, version: Version) -> usize {
self.transactional_id.write_size(version)
+ IsolationData(0i16).write_size(version)
+ TimeoutData(0i32).write_size(version)
+ self.topics.write_size(version)
}
fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), Error>
where
T: BufMut,
{
self.transactional_id.encode(dest, version)?;
IsolationData::from(self.isolation).encode(dest, version)?;
TimeoutData::try_from(self.timeout)?.encode(dest, version)?;
self.topics.encode(dest, version)?;
Ok(())
}
}
impl<R> Decoder for ProduceRequest<R>
where
R: Decoder + Encoder + Default + Debug,
{
fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), Error>
where
T: Buf,
{
self.transactional_id = Decoder::decode_from(src, version)?;
self.isolation = Isolation::from(IsolationData::decode_from(src, version)?);
self.timeout = Duration::try_from(TimeoutData::decode_from(src, version)?)?;
self.topics = Decoder::decode_from(src, version)?;
Ok(())
}
}
impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for ProduceRequest<R> {
fn clone(&self) -> Self {
Self {
transactional_id: self.transactional_id.clone(),
isolation: self.isolation,
timeout: self.timeout,
topics: self.topics.clone(),
data: self.data,
}
}
}
impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for TopicProduceData<R> {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
partitions: self.partitions.clone(),
data: self.data,
}
}
}
impl<R: Encoder + Decoder + Default + Debug + Clone> Clone for PartitionProduceData<R> {
fn clone(&self) -> Self {
Self {
partition_index: self.partition_index,
records: self.records.clone(),
}
}
}
#[derive(Encoder, Decoder, FluvioDefault, Debug)]
struct IsolationData(i16);
impl From<Isolation> for IsolationData {
fn from(isolation: Isolation) -> Self {
IsolationData(match isolation {
Isolation::ReadUncommitted => 1,
Isolation::ReadCommitted => -1,
})
}
}
impl From<IsolationData> for Isolation {
fn from(data: IsolationData) -> Self {
match data.0 {
acks if acks < 0 => Isolation::ReadCommitted,
_ => Isolation::ReadUncommitted,
}
}
}
#[derive(Encoder, Decoder, FluvioDefault, Debug)]
struct TimeoutData(i32);
impl TryFrom<Duration> for TimeoutData {
type Error = Error;
fn try_from(value: Duration) -> Result<Self, Self::Error> {
value.as_millis().try_into().map(TimeoutData).map_err(|_e| {
Error::new(
ErrorKind::InvalidInput,
"Timeout must fit into 4 bytes integer value",
)
})
}
}
impl TryFrom<TimeoutData> for Duration {
type Error = Error;
fn try_from(value: TimeoutData) -> Result<Self, Self::Error> {
u64::try_from(value.0)
.map(Duration::from_millis)
.map_err(|_e| {
Error::new(
ErrorKind::InvalidInput,
"Timeout must be positive integer value",
)
})
}
}
#[cfg(feature = "file")]
pub use file::*;
#[cfg(feature = "file")]
mod file {
use std::io::Error as IoError;
use tracing::trace;
use bytes::BytesMut;
use crate::core::Version;
use crate::record::FileRecordSet;
use crate::store::FileWrite;
use crate::store::StoreValue;
use super::*;
pub type FileProduceRequest = ProduceRequest<FileRecordSet>;
pub type FileTopicRequest = TopicProduceData<FileRecordSet>;
pub type FilePartitionRequest = PartitionProduceData<FileRecordSet>;
impl FileWrite for FileProduceRequest {
fn file_encode(
&self,
src: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
trace!("file encoding produce request");
self.transactional_id.encode(src, version)?;
IsolationData::from(self.isolation).encode(src, version)?;
TimeoutData::try_from(self.timeout)?.encode(src, version)?;
self.topics.file_encode(src, data, version)?;
Ok(())
}
}
impl FileWrite for FileTopicRequest {
fn file_encode(
&self,
src: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
trace!("file encoding produce topic request");
self.name.encode(src, version)?;
self.partitions.file_encode(src, data, version)?;
Ok(())
}
}
impl FileWrite for FilePartitionRequest {
fn file_encode(
&self,
src: &mut BytesMut,
data: &mut Vec<StoreValue>,
version: Version,
) -> Result<(), IoError> {
trace!("file encoding for partition request");
self.partition_index.encode(src, version)?;
self.records.file_encode(src, data, version)?;
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use std::io::{Error, ErrorKind};
use std::time::Duration;
use fluvio_protocol::{Decoder, Encoder};
use crate::Isolation;
use crate::produce::DefaultProduceRequest;
use crate::produce::TopicProduceData;
use crate::produce::PartitionProduceData;
use fluvio_protocol::api::Request;
use crate::batch::Batch;
use crate::record::{Record, RecordData, RecordSet};
#[test]
fn test_encode_decode_produce_request_isolation_timeout() -> Result<(), Error> {
let request = DefaultProduceRequest {
isolation: Isolation::ReadCommitted,
timeout: Duration::from_millis(123456),
..Default::default()
};
let version = DefaultProduceRequest::DEFAULT_API_VERSION;
let mut bytes = request.as_bytes(version)?;
let decoded: DefaultProduceRequest = Decoder::decode_from(&mut bytes, version)?;
assert_eq!(request.isolation, decoded.isolation);
assert_eq!(request.timeout, decoded.timeout);
Ok(())
}
#[test]
fn test_encode_produce_request_timeout_too_big() {
let request = DefaultProduceRequest {
isolation: Isolation::ReadCommitted,
timeout: Duration::from_millis(u64::MAX),
..Default::default()
};
let version = DefaultProduceRequest::DEFAULT_API_VERSION;
let result = request.as_bytes(version).expect_err("expected error");
assert_eq!(result.kind(), ErrorKind::InvalidInput);
assert_eq!(
result.to_string(),
"Timeout must fit into 4 bytes integer value"
);
}
#[test]
fn test_default_produce_request_clone() {
let request = DefaultProduceRequest {
transactional_id: Some("transaction_id".to_string()),
isolation: Default::default(),
timeout: Duration::from_millis(100),
topics: vec![TopicProduceData {
name: "topic".to_string(),
partitions: vec![PartitionProduceData {
partition_index: 1,
records: RecordSet {
batches: vec![Batch::from(vec![Record::new(RecordData::from(
"some raw data",
))])
.try_into()
.expect("compressed batch")],
},
}],
data: Default::default(),
}],
data: Default::default(),
};
let version = DefaultProduceRequest::DEFAULT_API_VERSION;
let cloned = request.clone();
let bytes = request.as_bytes(version).expect("encoded request");
let cloned_bytes = cloned.as_bytes(version).expect("encoded cloned request");
assert_eq!(bytes, cloned_bytes);
}
}