1use std::time::{Duration, Instant};
16
17use rama::{Context, Service};
18use tansu_sans_io::{
19 ApiKey, ErrorCode, FetchRequest, FetchResponse, IsolationLevel,
20 fetch_request::{FetchPartition, FetchTopic},
21 fetch_response::{
22 EpochEndOffset, FetchableTopicResponse, LeaderIdAndEpoch, PartitionData, SnapshotId,
23 },
24 metadata_response::MetadataResponseTopic,
25 record::deflated::{Batch, Frame},
26};
27use tokio::time::sleep;
28use tracing::{debug, error, instrument};
29
30use crate::{Error, Result, Storage, Topition};
31
32#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
121pub struct FetchService;
122
123impl ApiKey for FetchService {
124 const KEY: i16 = FetchRequest::KEY;
125}
126
127impl FetchService {
128 #[allow(clippy::too_many_arguments)]
129 async fn fetch_partition<G>(
130 &self,
131 ctx: Context<G>,
132 max_wait_ms: Duration,
133 min_bytes: u32,
134 max_bytes: &mut u32,
135 isolation: IsolationLevel,
136 topic: &str,
137 fetch_partition: &FetchPartition,
138 ) -> Result<PartitionData>
139 where
140 G: Storage,
141 {
142 debug!(
143 ?max_wait_ms,
144 ?min_bytes,
145 ?max_bytes,
146 ?isolation,
147 ?fetch_partition
148 );
149
150 let partition_index = fetch_partition.partition;
151 let tp = Topition::new(topic, partition_index);
152
153 let mut batches = Vec::new();
154
155 let mut offset = fetch_partition.fetch_offset;
156
157 loop {
158 if *max_bytes == 0 {
159 break;
160 }
161
162 debug!(offset);
163
164 let mut fetched = ctx
165 .state()
166 .fetch(&tp, offset, min_bytes, *max_bytes, isolation)
167 .await
168 .inspect(|r| debug!(?tp, ?offset, ?r))
169 .inspect_err(|error| error!(?tp, ?error))?;
170
171 *max_bytes =
172 u32::try_from(fetched.byte_size()).map(|bytes| max_bytes.saturating_sub(bytes))?;
173
174 offset += fetched
175 .iter()
176 .map(|batch| batch.record_count as i64)
177 .sum::<i64>();
178
179 debug!(?offset, ?fetched);
180
181 if fetched.is_empty() || fetched.first().is_some_and(|batch| batch.record_count == 0) {
182 break;
183 } else {
184 batches.append(&mut fetched);
185 }
186 }
187
188 let offset_stage = ctx
189 .state()
190 .offset_stage(&tp)
191 .await
192 .inspect_err(|error| error!(?error, ?tp))?;
193
194 Ok(PartitionData::default()
195 .partition_index(partition_index)
196 .error_code(ErrorCode::None.into())
197 .high_watermark(offset_stage.high_watermark())
198 .last_stable_offset(Some(offset_stage.last_stable()))
199 .log_start_offset(Some(offset_stage.log_start()))
200 .diverging_epoch(None)
201 .current_leader(None)
202 .snapshot_id(None)
203 .aborted_transactions(Some([].into()))
204 .preferred_read_replica(Some(-1))
205 .records(if batches.is_empty() {
206 None
207 } else {
208 Some(Frame { batches })
209 }))
210 .inspect(|r| debug!(?r))
211 }
212
213 fn unknown_topic_response(&self, fetch: &FetchTopic) -> Result<FetchableTopicResponse> {
214 Ok(FetchableTopicResponse::default()
215 .topic(fetch.topic.clone())
216 .topic_id(Some([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))
217 .partitions(fetch.partitions.as_ref().map(|partitions| {
218 partitions
219 .iter()
220 .map(|partition| {
221 PartitionData::default()
222 .partition_index(partition.partition)
223 .error_code(ErrorCode::UnknownTopicOrPartition.into())
224 .high_watermark(0)
225 .last_stable_offset(Some(0))
226 .log_start_offset(Some(-1))
227 .diverging_epoch(Some(
228 EpochEndOffset::default().epoch(-1).end_offset(-1),
229 ))
230 .current_leader(Some(
231 LeaderIdAndEpoch::default().leader_id(0).leader_epoch(0),
232 ))
233 .snapshot_id(Some(SnapshotId::default().end_offset(-1).epoch(-1)))
234 .aborted_transactions(Some([].into()))
235 .preferred_read_replica(Some(-1))
236 .records(None)
237 })
238 .collect()
239 })))
240 }
241
242 #[allow(clippy::too_many_arguments)]
243 async fn fetch_topic<G>(
244 &self,
245 ctx: Context<G>,
246 max_wait_ms: Duration,
247 min_bytes: u32,
248 max_bytes: &mut u32,
249 isolation: IsolationLevel,
250 fetch: &FetchTopic,
251 _is_first: bool,
252 ) -> Result<FetchableTopicResponse>
253 where
254 G: Storage,
255 {
256 debug!(?max_wait_ms, ?min_bytes, ?isolation, ?fetch);
257
258 let metadata = ctx.state().metadata(Some(&[fetch.into()])).await?;
259
260 if let Some(MetadataResponseTopic {
261 topic_id,
262 name: Some(name),
263 ..
264 }) = metadata.topics().first()
265 {
266 let mut partitions = Vec::new();
267
268 for fetch_partition in fetch.partitions.as_ref().unwrap_or(&Vec::new()) {
269 let partition = self
270 .fetch_partition(
271 ctx.clone(),
272 max_wait_ms,
273 min_bytes,
274 max_bytes,
275 isolation,
276 name,
277 fetch_partition,
278 )
279 .await?;
280
281 partitions.push(partition);
282 }
283
284 Ok(FetchableTopicResponse::default()
285 .topic(fetch.topic.to_owned())
286 .topic_id(topic_id.to_owned())
287 .partitions(Some(partitions)))
288 } else {
289 self.unknown_topic_response(fetch)
290 }
291 }
292
293 pub(crate) async fn fetch<G>(
294 &self,
295 ctx: Context<G>,
296 max_wait: Duration,
297 min_bytes: u32,
298 max_bytes: &mut u32,
299 isolation: IsolationLevel,
300 topics: &[FetchTopic],
301 ) -> Result<Vec<FetchableTopicResponse>>
302 where
303 G: Storage,
304 {
305 debug!(?max_wait, ?min_bytes, ?isolation, ?topics);
306
307 if topics.is_empty() {
308 Ok(vec![])
309 } else {
310 let start = Instant::now();
311 let mut responses = vec![];
312 let mut iteration = 0;
313 let mut elapsed = Duration::from_millis(0);
314 let mut bytes = 0;
315
316 while elapsed <= max_wait && bytes <= min_bytes {
317 debug!(?elapsed, ?max_wait, ?bytes, ?min_bytes);
318
319 let enumerate = topics.iter().enumerate();
320 responses.clear();
321
322 for (i, fetch) in enumerate {
323 let fetch_response = self
324 .fetch_topic(
325 ctx.clone(),
326 max_wait,
327 min_bytes,
328 max_bytes,
329 isolation,
330 fetch,
331 i == 0,
332 )
333 .await?;
334
335 responses.push(fetch_response);
336 }
337
338 bytes += u32::try_from(responses.byte_size())?;
339
340 let now = Instant::now();
341 elapsed = now.duration_since(start);
342 let remaining = max_wait.saturating_sub(elapsed);
343
344 debug!(
345 ?iteration,
346 ?max_wait,
347 ?elapsed,
348 ?remaining,
349 ?bytes,
350 ?min_bytes
351 );
352
353 sleep(if remaining.as_millis() >= 250 {
354 remaining / 2
355 } else {
356 remaining
357 })
358 .await;
359
360 iteration += 1;
361 }
362
363 Ok(responses)
364 }
365 }
366}
367
368impl<G> Service<G, FetchRequest> for FetchService
369where
370 G: Storage,
371{
372 type Response = FetchResponse;
373 type Error = Error;
374
375 #[instrument(skip(ctx, req))]
376 async fn serve(
377 &self,
378 ctx: Context<G>,
379 req: FetchRequest,
380 ) -> Result<Self::Response, Self::Error> {
381 let responses = Some(if let Some(topics) = req.topics {
382 let isolation_level = req
383 .isolation_level
384 .map_or(Ok(IsolationLevel::ReadUncommitted), |isolation| {
385 IsolationLevel::try_from(isolation)
386 })?;
387
388 let max_wait_ms = u64::try_from(req.max_wait_ms).map(Duration::from_millis)?;
389
390 let min_bytes = u32::try_from(req.min_bytes)?;
391
392 const DEFAULT_MAX_BYTES: u32 = 5 * 1024 * 1024;
393
394 let mut max_bytes = req.max_bytes.map_or(Ok(DEFAULT_MAX_BYTES), |max_bytes| {
395 u32::try_from(max_bytes).map(|max_bytes| max_bytes.min(DEFAULT_MAX_BYTES))
396 })?;
397
398 self.fetch(
399 ctx,
400 max_wait_ms,
401 min_bytes,
402 &mut max_bytes,
403 isolation_level,
404 topics.as_ref(),
405 )
406 .await?
407 } else {
408 vec![]
409 });
410
411 Ok(FetchResponse::default()
412 .throttle_time_ms(Some(0))
413 .error_code(Some(ErrorCode::None.into()))
414 .session_id(Some(0))
415 .node_endpoints(Some([].into()))
416 .responses(responses))
417 .inspect(|r| debug!(?r))
418 }
419}
420
421trait ByteSize {
422 fn byte_size(&self) -> u64;
423}
424
425impl<T> ByteSize for Vec<T>
426where
427 T: ByteSize,
428{
429 fn byte_size(&self) -> u64 {
430 self.iter().map(|item| item.byte_size()).sum()
431 }
432}
433
434impl<T> ByteSize for Option<T>
435where
436 T: ByteSize,
437{
438 fn byte_size(&self) -> u64 {
439 self.as_ref().map_or(0, |some| some.byte_size())
440 }
441}
442
443impl ByteSize for Batch {
444 fn byte_size(&self) -> u64 {
445 self.record_data.len() as u64
446 }
447}
448
449impl ByteSize for Frame {
450 fn byte_size(&self) -> u64 {
451 self.batches.byte_size()
452 }
453}
454
455impl ByteSize for PartitionData {
456 fn byte_size(&self) -> u64 {
457 self.records.byte_size()
458 }
459}
460
461impl ByteSize for FetchableTopicResponse {
462 fn byte_size(&self) -> u64 {
463 self.partitions.byte_size()
464 }
465}