1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::sync::Arc;
use tracing::{debug, trace, instrument};
use dataplane::ReplicaKey;
use crate::FluvioError;
use crate::spu::SpuPool;
use crate::client::SerialFrame;
pub struct TopicProducer {
topic: String,
pool: Arc<SpuPool>,
}
impl TopicProducer {
pub(crate) fn new(topic: String, pool: Arc<SpuPool>) -> Self {
Self { topic, pool }
}
#[instrument(
skip(self, buffer),
fields(topic = &*self.topic),
)]
pub async fn send_record<B: AsRef<[u8]>>(
&self,
buffer: B,
partition: i32,
) -> Result<(), FluvioError> {
let record = buffer.as_ref();
let replica = ReplicaKey::new(&self.topic, partition);
debug!("sending records: {} bytes to: {}", record.len(), &replica);
let spu_client = self.pool.create_serial_socket(&replica).await?;
debug!("connect to replica leader at: {}", spu_client);
send_record_raw(spu_client, &replica, record).await
}
}
async fn send_record_raw<F: SerialFrame>(
mut leader: F,
replica: &ReplicaKey,
record: &[u8],
) -> Result<(), FluvioError> {
use dataplane::produce::DefaultProduceRequest;
use dataplane::produce::DefaultPartitionRequest;
use dataplane::produce::DefaultTopicRequest;
use dataplane::batch::DefaultBatch;
use dataplane::record::DefaultRecord;
let mut request = DefaultProduceRequest::default();
let mut topic_request = DefaultTopicRequest::default();
let mut partition_request = DefaultPartitionRequest::default();
debug!(
"send record {} bytes to: replica: {}, {}",
record.len(),
replica,
leader
);
let record_msg: DefaultRecord = record.into();
let mut batch = DefaultBatch::default();
batch.records.push(record_msg);
partition_request.partition_index = replica.partition;
partition_request.records.batches.push(batch);
topic_request.name = replica.topic.to_owned();
topic_request.partitions.push(partition_request);
request.acks = 1;
request.timeout_ms = 1500;
request.topics.push(topic_request);
trace!("produce request: {:#?}", request);
let response = leader.send_receive(request).await?;
trace!("received response: {:?}", response);
match response.find_partition_response(&replica.topic, replica.partition) {
Some(partition_response) => {
if partition_response.error_code.is_error() {
return Err(IoError::new(
ErrorKind::Other,
partition_response.error_code.to_sentence(),
)
.into());
}
Ok(())
}
None => Err(IoError::new(ErrorKind::Other, "unknown error").into()),
}
}