Skip to main content

crabka_protocol/legacy_compat/
fetch.rs

1use crate::kafka_3_6_2;
2use crate::owned::fetch_request::FetchRequest;
3use crate::owned::fetch_response::FetchResponse;
4
5// ── Request: legacy → canonical ──────────────────────────────────────────────
6
7impl From<kafka_3_6_2::owned::fetch_request::FetchRequest> for FetchRequest {
8    fn from(l: kafka_3_6_2::owned::fetch_request::FetchRequest) -> Self {
9        Self {
10            replica_id: l.replica_id,
11            max_wait_ms: l.max_wait_ms,
12            min_bytes: l.min_bytes,
13            max_bytes: l.max_bytes,
14            isolation_level: l.isolation_level,
15            session_id: l.session_id,
16            session_epoch: l.session_epoch,
17            topics: l.topics.into_iter().map(Into::into).collect(),
18            forgotten_topics_data: l
19                .forgotten_topics_data
20                .into_iter()
21                .map(Into::into)
22                .collect(),
23            rack_id: l.rack_id,
24            cluster_id: l.cluster_id,
25            replica_state: l.replica_state.into(),
26            ..Default::default()
27        }
28    }
29}
30
31impl From<kafka_3_6_2::owned::fetch_request::ReplicaState>
32    for crate::owned::fetch_request::ReplicaState
33{
34    fn from(l: kafka_3_6_2::owned::fetch_request::ReplicaState) -> Self {
35        Self {
36            replica_id: l.replica_id,
37            replica_epoch: l.replica_epoch,
38            ..Default::default()
39        }
40    }
41}
42
43impl From<kafka_3_6_2::owned::fetch_request::FetchTopic>
44    for crate::owned::fetch_request::FetchTopic
45{
46    fn from(l: kafka_3_6_2::owned::fetch_request::FetchTopic) -> Self {
47        Self {
48            topic: l.topic,
49            // topic_id (v13+) defaults to Uuid::nil()
50            partitions: l.partitions.into_iter().map(Into::into).collect(),
51            ..Default::default()
52        }
53    }
54}
55
56impl From<kafka_3_6_2::owned::fetch_request::FetchPartition>
57    for crate::owned::fetch_request::FetchPartition
58{
59    fn from(l: kafka_3_6_2::owned::fetch_request::FetchPartition) -> Self {
60        Self {
61            partition: l.partition,
62            current_leader_epoch: l.current_leader_epoch,
63            fetch_offset: l.fetch_offset,
64            last_fetched_epoch: l.last_fetched_epoch,
65            log_start_offset: l.log_start_offset,
66            partition_max_bytes: l.partition_max_bytes,
67            // replica_directory_id (v17+ tagged) and high_watermark (v18+ tagged) default
68            ..Default::default()
69        }
70    }
71}
72
73impl From<kafka_3_6_2::owned::fetch_request::ForgottenTopic>
74    for crate::owned::fetch_request::ForgottenTopic
75{
76    fn from(l: kafka_3_6_2::owned::fetch_request::ForgottenTopic) -> Self {
77        Self {
78            topic: l.topic,
79            // topic_id (v13+) defaults to Uuid::nil()
80            partitions: l.partitions,
81            ..Default::default()
82        }
83    }
84}
85
86// ── Response: canonical → legacy ─────────────────────────────────────────────
87
88impl From<FetchResponse> for kafka_3_6_2::owned::fetch_response::FetchResponse {
89    fn from(c: FetchResponse) -> Self {
90        Self {
91            throttle_time_ms: c.throttle_time_ms,
92            error_code: c.error_code,
93            session_id: c.session_id,
94            responses: c.responses.into_iter().map(Into::into).collect(),
95            // node_endpoints (canonical v16+ tagged field) dropped — not present in 3.6.2 schema
96            ..Default::default()
97        }
98    }
99}
100
101impl From<crate::owned::fetch_response::FetchableTopicResponse>
102    for kafka_3_6_2::owned::fetch_response::FetchableTopicResponse
103{
104    fn from(c: crate::owned::fetch_response::FetchableTopicResponse) -> Self {
105        Self {
106            topic: c.topic,
107            topic_id: c.topic_id,
108            partitions: c.partitions.into_iter().map(Into::into).collect(),
109            ..Default::default()
110        }
111    }
112}
113
114impl From<crate::owned::fetch_response::PartitionData>
115    for kafka_3_6_2::owned::fetch_response::PartitionData
116{
117    fn from(c: crate::owned::fetch_response::PartitionData) -> Self {
118        Self {
119            partition_index: c.partition_index,
120            error_code: c.error_code,
121            high_watermark: c.high_watermark,
122            last_stable_offset: c.last_stable_offset,
123            log_start_offset: c.log_start_offset,
124            aborted_transactions: c
125                .aborted_transactions
126                .map(|v| v.into_iter().map(Into::into).collect()),
127            preferred_read_replica: c.preferred_read_replica,
128            records: c.records,
129            diverging_epoch: c.diverging_epoch.into(),
130            current_leader: c.current_leader.into(),
131            snapshot_id: c.snapshot_id.into(),
132            ..Default::default()
133        }
134    }
135}
136
137impl From<crate::owned::fetch_response::AbortedTransaction>
138    for kafka_3_6_2::owned::fetch_response::AbortedTransaction
139{
140    fn from(c: crate::owned::fetch_response::AbortedTransaction) -> Self {
141        Self {
142            producer_id: c.producer_id,
143            first_offset: c.first_offset,
144            ..Default::default()
145        }
146    }
147}
148
149impl From<crate::owned::fetch_response::EpochEndOffset>
150    for kafka_3_6_2::owned::fetch_response::EpochEndOffset
151{
152    fn from(c: crate::owned::fetch_response::EpochEndOffset) -> Self {
153        Self {
154            epoch: c.epoch,
155            end_offset: c.end_offset,
156            ..Default::default()
157        }
158    }
159}
160
161impl From<crate::owned::fetch_response::LeaderIdAndEpoch>
162    for kafka_3_6_2::owned::fetch_response::LeaderIdAndEpoch
163{
164    fn from(c: crate::owned::fetch_response::LeaderIdAndEpoch) -> Self {
165        Self {
166            leader_id: c.leader_id,
167            leader_epoch: c.leader_epoch,
168            ..Default::default()
169        }
170    }
171}
172
173impl From<crate::owned::fetch_response::SnapshotId>
174    for kafka_3_6_2::owned::fetch_response::SnapshotId
175{
176    fn from(c: crate::owned::fetch_response::SnapshotId) -> Self {
177        Self {
178            end_offset: c.end_offset,
179            epoch: c.epoch,
180            ..Default::default()
181        }
182    }
183}