Skip to main content

crabka_protocol/legacy_compat/
produce.rs

1use crate::kafka_3_6_2;
2use crate::owned::produce_request::ProduceRequest;
3use crate::owned::produce_response::ProduceResponse;
4
5// ── Request: legacy → canonical ──────────────────────────────────────────────
6
7impl From<kafka_3_6_2::owned::produce_request::ProduceRequest> for ProduceRequest {
8    fn from(legacy: kafka_3_6_2::owned::produce_request::ProduceRequest) -> Self {
9        Self {
10            transactional_id: legacy.transactional_id,
11            acks: legacy.acks,
12            timeout_ms: legacy.timeout_ms,
13            topic_data: legacy.topic_data.into_iter().map(Into::into).collect(),
14            ..Default::default()
15        }
16    }
17}
18
19impl From<kafka_3_6_2::owned::produce_request::TopicProduceData>
20    for crate::owned::produce_request::TopicProduceData
21{
22    fn from(l: kafka_3_6_2::owned::produce_request::TopicProduceData) -> Self {
23        Self {
24            name: l.name,
25            partition_data: l.partition_data.into_iter().map(Into::into).collect(),
26            // topic_id (v13+) defaults to Uuid::nil()
27            ..Default::default()
28        }
29    }
30}
31
32impl From<kafka_3_6_2::owned::produce_request::PartitionProduceData>
33    for crate::owned::produce_request::PartitionProduceData
34{
35    fn from(l: kafka_3_6_2::owned::produce_request::PartitionProduceData) -> Self {
36        Self {
37            index: l.index,
38            // RecordsPayload is the same type across both namespaces
39            records: l.records,
40            ..Default::default()
41        }
42    }
43}
44
45// ── Response: canonical → legacy ─────────────────────────────────────────────
46
47impl From<ProduceResponse> for kafka_3_6_2::owned::produce_response::ProduceResponse {
48    fn from(c: ProduceResponse) -> Self {
49        Self {
50            responses: c.responses.into_iter().map(Into::into).collect(),
51            throttle_time_ms: c.throttle_time_ms,
52            ..Default::default()
53        }
54    }
55}
56
57impl From<crate::owned::produce_response::TopicProduceResponse>
58    for kafka_3_6_2::owned::produce_response::TopicProduceResponse
59{
60    fn from(c: crate::owned::produce_response::TopicProduceResponse) -> Self {
61        Self {
62            name: c.name,
63            partition_responses: c.partition_responses.into_iter().map(Into::into).collect(),
64            ..Default::default()
65        }
66    }
67}
68
69impl From<crate::owned::produce_response::PartitionProduceResponse>
70    for kafka_3_6_2::owned::produce_response::PartitionProduceResponse
71{
72    fn from(c: crate::owned::produce_response::PartitionProduceResponse) -> Self {
73        Self {
74            index: c.index,
75            error_code: c.error_code,
76            base_offset: c.base_offset,
77            log_append_time_ms: c.log_append_time_ms,
78            log_start_offset: c.log_start_offset,
79            record_errors: c.record_errors.into_iter().map(Into::into).collect(),
80            error_message: c.error_message,
81            ..Default::default() // Defaulted: current_leader (canonical-only v10+ tagged field)
82        }
83    }
84}
85
86impl From<crate::owned::produce_response::BatchIndexAndErrorMessage>
87    for kafka_3_6_2::owned::produce_response::BatchIndexAndErrorMessage
88{
89    fn from(c: crate::owned::produce_response::BatchIndexAndErrorMessage) -> Self {
90        Self {
91            batch_index: c.batch_index,
92            batch_index_error_message: c.batch_index_error_message,
93            ..Default::default()
94        }
95    }
96}