Skip to main content

tansu_storage/service/
fetch.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::{Duration, Instant};
16
17use rama::{Context, Service};
18use tansu_sans_io::{
19    ApiKey, ErrorCode, FetchRequest, FetchResponse, IsolationLevel,
20    fetch_request::{FetchPartition, FetchTopic},
21    fetch_response::{
22        EpochEndOffset, FetchableTopicResponse, LeaderIdAndEpoch, PartitionData, SnapshotId,
23    },
24    metadata_response::MetadataResponseTopic,
25    record::deflated::{Batch, Frame},
26};
27use tokio::time::sleep;
28use tracing::{debug, error, instrument};
29
30use crate::{Error, Result, Storage, Topition};
31
32/// A [`Service`] using [`Storage`] as [`Context`] taking [`FetchRequest`] returning [`FetchResponse`].
33/// ```
34/// use rama::{Context, Layer as _, Service as _, layer::MapStateLayer};
35/// use tansu_sans_io::{
36///     CreateTopicsRequest, ErrorCode, FetchRequest,
37///     create_topics_request::CreatableTopic,
38///     fetch_request::{FetchPartition, FetchTopic},
39/// };
40/// use tansu_storage::{CreateTopicsService, Error, FetchService, StorageContainer};
41/// use url::Url;
42///
43/// # #[tokio::main]
44/// # async fn main() -> Result<(), Error> {
45/// const CLUSTER_ID: &str = "tansu";
46/// const NODE_ID: i32 = 111;
47/// const HOST: &str = "localhost";
48/// const PORT: i32 = 9092;
49///
50/// let storage = StorageContainer::builder()
51///     .cluster_id(CLUSTER_ID)
52///     .node_id(NODE_ID)
53///     .advertised_listener(Url::parse(&format!("tcp://{HOST}:{PORT}"))?)
54///     .storage(Url::parse("memory://tansu/")?)
55///     .build()
56///     .await?;
57///
58/// let create_topic = {
59///     let storage = storage.clone();
60///     MapStateLayer::new(|_| storage).into_layer(CreateTopicsService)
61/// };
62///
63/// let name = "abcba";
64///
65/// let response = create_topic
66///     .serve(
67///         Context::default(),
68///         CreateTopicsRequest::default()
69///             .topics(Some(vec![
70///                 CreatableTopic::default()
71///                     .name(name.into())
72///                     .num_partitions(5)
73///                     .replication_factor(3)
74///                     .assignments(Some([].into()))
75///                     .configs(Some([].into())),
76///             ]))
77///             .validate_only(Some(false)),
78///     )
79///     .await?;
80///
81/// let topics = response.topics.unwrap_or_default();
82/// assert_eq!(1, topics.len());
83/// assert_eq!(ErrorCode::None, ErrorCode::try_from(topics[0].error_code)?);
84///
85/// let fetch = {
86///     let storage = storage.clone();
87///     MapStateLayer::new(|_| storage).into_layer(FetchService)
88/// };
89///
90/// let partition = 0;
91///
92/// let response = fetch
93///     .serve(
94///         Context::default(),
95///         FetchRequest::default()
96///             .topics(Some(
97///                 [FetchTopic::default()
98///                     .topic(Some(name.into()))
99///                     .partitions(Some(
100///                         [FetchPartition::default().partition(partition)].into(),
101///                     ))]
102///                 .into(),
103///             ))
104///             .max_bytes(Some(0))
105///             .max_wait_ms(5_000),
106///     )
107///     .await?;
108///
109/// let topics = response.responses.as_deref().unwrap_or_default();
110/// assert_eq!(1, topics.len());
111/// let partitions = topics[0].partitions.as_deref().unwrap_or_default();
112/// assert_eq!(1, partitions.len());
113/// assert_eq!(
114///     ErrorCode::None,
115///     ErrorCode::try_from(partitions[0].error_code)?
116/// );
117/// # Ok(())
118/// # }
119/// ```
120#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
121pub struct FetchService;
122
123impl ApiKey for FetchService {
124    const KEY: i16 = FetchRequest::KEY;
125}
126
127impl FetchService {
128    #[allow(clippy::too_many_arguments)]
129    async fn fetch_partition<G>(
130        &self,
131        ctx: Context<G>,
132        max_wait_ms: Duration,
133        min_bytes: u32,
134        max_bytes: &mut u32,
135        isolation: IsolationLevel,
136        topic: &str,
137        fetch_partition: &FetchPartition,
138    ) -> Result<PartitionData>
139    where
140        G: Storage,
141    {
142        debug!(
143            ?max_wait_ms,
144            ?min_bytes,
145            ?max_bytes,
146            ?isolation,
147            ?fetch_partition
148        );
149
150        let partition_index = fetch_partition.partition;
151        let tp = Topition::new(topic, partition_index);
152
153        let mut batches = Vec::new();
154
155        let mut offset = fetch_partition.fetch_offset;
156
157        loop {
158            if *max_bytes == 0 {
159                break;
160            }
161
162            debug!(offset);
163
164            let mut fetched = ctx
165                .state()
166                .fetch(&tp, offset, min_bytes, *max_bytes, isolation)
167                .await
168                .inspect(|r| debug!(?tp, ?offset, ?r))
169                .inspect_err(|error| error!(?tp, ?error))?;
170
171            *max_bytes =
172                u32::try_from(fetched.byte_size()).map(|bytes| max_bytes.saturating_sub(bytes))?;
173
174            offset += fetched
175                .iter()
176                .map(|batch| batch.record_count as i64)
177                .sum::<i64>();
178
179            debug!(?offset, ?fetched);
180
181            if fetched.is_empty() || fetched.first().is_some_and(|batch| batch.record_count == 0) {
182                break;
183            } else {
184                batches.append(&mut fetched);
185            }
186        }
187
188        let offset_stage = ctx
189            .state()
190            .offset_stage(&tp)
191            .await
192            .inspect_err(|error| error!(?error, ?tp))?;
193
194        Ok(PartitionData::default()
195            .partition_index(partition_index)
196            .error_code(ErrorCode::None.into())
197            .high_watermark(offset_stage.high_watermark())
198            .last_stable_offset(Some(offset_stage.last_stable()))
199            .log_start_offset(Some(offset_stage.log_start()))
200            .diverging_epoch(None)
201            .current_leader(None)
202            .snapshot_id(None)
203            .aborted_transactions(Some([].into()))
204            .preferred_read_replica(Some(-1))
205            .records(if batches.is_empty() {
206                None
207            } else {
208                Some(Frame { batches })
209            }))
210        .inspect(|r| debug!(?r))
211    }
212
213    fn unknown_topic_response(&self, fetch: &FetchTopic) -> Result<FetchableTopicResponse> {
214        Ok(FetchableTopicResponse::default()
215            .topic(fetch.topic.clone())
216            .topic_id(Some([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))
217            .partitions(fetch.partitions.as_ref().map(|partitions| {
218                partitions
219                    .iter()
220                    .map(|partition| {
221                        PartitionData::default()
222                            .partition_index(partition.partition)
223                            .error_code(ErrorCode::UnknownTopicOrPartition.into())
224                            .high_watermark(0)
225                            .last_stable_offset(Some(0))
226                            .log_start_offset(Some(-1))
227                            .diverging_epoch(Some(
228                                EpochEndOffset::default().epoch(-1).end_offset(-1),
229                            ))
230                            .current_leader(Some(
231                                LeaderIdAndEpoch::default().leader_id(0).leader_epoch(0),
232                            ))
233                            .snapshot_id(Some(SnapshotId::default().end_offset(-1).epoch(-1)))
234                            .aborted_transactions(Some([].into()))
235                            .preferred_read_replica(Some(-1))
236                            .records(None)
237                    })
238                    .collect()
239            })))
240    }
241
242    #[allow(clippy::too_many_arguments)]
243    async fn fetch_topic<G>(
244        &self,
245        ctx: Context<G>,
246        max_wait_ms: Duration,
247        min_bytes: u32,
248        max_bytes: &mut u32,
249        isolation: IsolationLevel,
250        fetch: &FetchTopic,
251        _is_first: bool,
252    ) -> Result<FetchableTopicResponse>
253    where
254        G: Storage,
255    {
256        debug!(?max_wait_ms, ?min_bytes, ?isolation, ?fetch);
257
258        let metadata = ctx.state().metadata(Some(&[fetch.into()])).await?;
259
260        if let Some(MetadataResponseTopic {
261            topic_id,
262            name: Some(name),
263            ..
264        }) = metadata.topics().first()
265        {
266            let mut partitions = Vec::new();
267
268            for fetch_partition in fetch.partitions.as_ref().unwrap_or(&Vec::new()) {
269                let partition = self
270                    .fetch_partition(
271                        ctx.clone(),
272                        max_wait_ms,
273                        min_bytes,
274                        max_bytes,
275                        isolation,
276                        name,
277                        fetch_partition,
278                    )
279                    .await?;
280
281                partitions.push(partition);
282            }
283
284            Ok(FetchableTopicResponse::default()
285                .topic(fetch.topic.to_owned())
286                .topic_id(topic_id.to_owned())
287                .partitions(Some(partitions)))
288        } else {
289            self.unknown_topic_response(fetch)
290        }
291    }
292
293    pub(crate) async fn fetch<G>(
294        &self,
295        ctx: Context<G>,
296        max_wait: Duration,
297        min_bytes: u32,
298        max_bytes: &mut u32,
299        isolation: IsolationLevel,
300        topics: &[FetchTopic],
301    ) -> Result<Vec<FetchableTopicResponse>>
302    where
303        G: Storage,
304    {
305        debug!(?max_wait, ?min_bytes, ?isolation, ?topics);
306
307        if topics.is_empty() {
308            Ok(vec![])
309        } else {
310            let start = Instant::now();
311            let mut responses = vec![];
312            let mut iteration = 0;
313            let mut elapsed = Duration::from_millis(0);
314            let mut bytes = 0;
315
316            while elapsed <= max_wait && bytes <= min_bytes {
317                debug!(?elapsed, ?max_wait, ?bytes, ?min_bytes);
318
319                let enumerate = topics.iter().enumerate();
320                responses.clear();
321
322                for (i, fetch) in enumerate {
323                    let fetch_response = self
324                        .fetch_topic(
325                            ctx.clone(),
326                            max_wait,
327                            min_bytes,
328                            max_bytes,
329                            isolation,
330                            fetch,
331                            i == 0,
332                        )
333                        .await?;
334
335                    responses.push(fetch_response);
336                }
337
338                bytes += u32::try_from(responses.byte_size())?;
339
340                let now = Instant::now();
341                elapsed = now.duration_since(start);
342                let remaining = max_wait.saturating_sub(elapsed);
343
344                debug!(
345                    ?iteration,
346                    ?max_wait,
347                    ?elapsed,
348                    ?remaining,
349                    ?bytes,
350                    ?min_bytes
351                );
352
353                sleep(if remaining.as_millis() >= 250 {
354                    remaining / 2
355                } else {
356                    remaining
357                })
358                .await;
359
360                iteration += 1;
361            }
362
363            Ok(responses)
364        }
365    }
366}
367
368impl<G> Service<G, FetchRequest> for FetchService
369where
370    G: Storage,
371{
372    type Response = FetchResponse;
373    type Error = Error;
374
375    #[instrument(skip(ctx, req))]
376    async fn serve(
377        &self,
378        ctx: Context<G>,
379        req: FetchRequest,
380    ) -> Result<Self::Response, Self::Error> {
381        let responses = Some(if let Some(topics) = req.topics {
382            let isolation_level = req
383                .isolation_level
384                .map_or(Ok(IsolationLevel::ReadUncommitted), |isolation| {
385                    IsolationLevel::try_from(isolation)
386                })?;
387
388            let max_wait_ms = u64::try_from(req.max_wait_ms).map(Duration::from_millis)?;
389
390            let min_bytes = u32::try_from(req.min_bytes)?;
391
392            const DEFAULT_MAX_BYTES: u32 = 5 * 1024 * 1024;
393
394            let mut max_bytes = req.max_bytes.map_or(Ok(DEFAULT_MAX_BYTES), |max_bytes| {
395                u32::try_from(max_bytes).map(|max_bytes| max_bytes.min(DEFAULT_MAX_BYTES))
396            })?;
397
398            self.fetch(
399                ctx,
400                max_wait_ms,
401                min_bytes,
402                &mut max_bytes,
403                isolation_level,
404                topics.as_ref(),
405            )
406            .await?
407        } else {
408            vec![]
409        });
410
411        Ok(FetchResponse::default()
412            .throttle_time_ms(Some(0))
413            .error_code(Some(ErrorCode::None.into()))
414            .session_id(Some(0))
415            .node_endpoints(Some([].into()))
416            .responses(responses))
417        .inspect(|r| debug!(?r))
418    }
419}
420
421trait ByteSize {
422    fn byte_size(&self) -> u64;
423}
424
425impl<T> ByteSize for Vec<T>
426where
427    T: ByteSize,
428{
429    fn byte_size(&self) -> u64 {
430        self.iter().map(|item| item.byte_size()).sum()
431    }
432}
433
434impl<T> ByteSize for Option<T>
435where
436    T: ByteSize,
437{
438    fn byte_size(&self) -> u64 {
439        self.as_ref().map_or(0, |some| some.byte_size())
440    }
441}
442
443impl ByteSize for Batch {
444    fn byte_size(&self) -> u64 {
445        self.record_data.len() as u64
446    }
447}
448
449impl ByteSize for Frame {
450    fn byte_size(&self) -> u64 {
451        self.batches.byte_size()
452    }
453}
454
455impl ByteSize for PartitionData {
456    fn byte_size(&self) -> u64 {
457        self.records.byte_size()
458    }
459}
460
461impl ByteSize for FetchableTopicResponse {
462    fn byte_size(&self) -> u64 {
463        self.partitions.byte_size()
464    }
465}