fluss/rpc/message/
produce_log.rs1use crate::error::Result as FlussResult;
19use crate::proto::{PbProduceLogReqForBucket, ProduceLogResponse};
20use crate::rpc::frame::ReadError;
21
22use crate::client::ReadyWriteBatch;
23use crate::rpc::api_key::ApiKey;
24use crate::rpc::api_version::ApiVersion;
25use crate::rpc::frame::WriteError;
26use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
27use crate::{impl_read_version_type, impl_write_version_type, proto};
28use bytes::{Buf, BufMut};
29use prost::Message;
30
31pub struct ProduceLogRequest {
32 pub inner_request: proto::ProduceLogRequest,
33}
34
35impl ProduceLogRequest {
36 pub fn new(
37 table_id: i64,
38 ack: i16,
39 max_request_timeout_ms: i32,
40 ready_batches: &mut [ReadyWriteBatch],
41 ) -> FlussResult<Self> {
42 let mut request = proto::ProduceLogRequest {
43 table_id,
44 acks: ack as i32,
45 timeout_ms: max_request_timeout_ms,
46 ..Default::default()
47 };
48 for ready_batch in ready_batches {
49 request.buckets_req.push(PbProduceLogReqForBucket {
50 partition_id: ready_batch.table_bucket.partition_id(),
51 bucket_id: ready_batch.table_bucket.bucket_id(),
52 records: ready_batch.write_batch.build()?,
53 })
54 }
55
56 Ok(ProduceLogRequest {
57 inner_request: request,
58 })
59 }
60}
61
62impl RequestBody for ProduceLogRequest {
63 type ResponseBody = ProduceLogResponse;
64
65 const API_KEY: ApiKey = ApiKey::ProduceLog;
66
67 const REQUEST_VERSION: ApiVersion = ApiVersion(0);
68}
69
70impl_write_version_type!(ProduceLogRequest);
71impl_read_version_type!(ProduceLogResponse);