Skip to main content

fluss/client/table/
scanner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::connection::FlussConnection;
19use crate::client::credentials::SecurityTokenManager;
20use crate::client::metadata::Metadata;
21use crate::client::table::log_fetch_buffer::{
22    CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel,
23    LogFetchBuffer, RemotePendingFetch,
24};
25use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo};
26use crate::error::Error::UnsupportedOperation;
27use crate::error::{ApiError, Error, FlussError, Result};
28use crate::metadata::{LogFormat, PhysicalTablePath, TableBucket, TableInfo, TablePath};
29use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable};
30use crate::record::{
31    LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords, to_arrow_schema,
32};
33use crate::rpc::{RpcClient, RpcError, message};
34use crate::util::FairBucketStatusMap;
35use crate::{PartitionId, TableId};
36use arrow_schema::SchemaRef;
37use log::{debug, warn};
38use parking_lot::{Mutex, RwLock};
39use std::{
40    collections::{HashMap, HashSet},
41    slice::from_ref,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tempfile::TempDir;
46
47pub struct TableScan<'a> {
48    conn: &'a FlussConnection,
49    table_info: TableInfo,
50    metadata: Arc<Metadata>,
51    /// Column indices to project. None means all columns, Some(vec) means only the specified columns (non-empty).
52    projected_fields: Option<Vec<usize>>,
53}
54
55impl<'a> TableScan<'a> {
56    pub fn new(conn: &'a FlussConnection, table_info: TableInfo, metadata: Arc<Metadata>) -> Self {
57        Self {
58            conn,
59            table_info,
60            metadata,
61            projected_fields: None,
62        }
63    }
64
65    /// Projects the scan to only include specified columns by their indices.
66    ///
67    /// # Arguments
68    /// * `column_indices` - Zero-based indices of columns to include in the scan
69    ///
70    /// # Errors
71    /// Returns an error if `column_indices` is empty or if any column index is out of range.
72    ///
73    /// # Example
74    /// ```
75    /// # use fluss::client::FlussConnection;
76    /// # use fluss::config::Config;
77    /// # use fluss::error::Result;
78    /// # use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
79    /// # use fluss::row::InternalRow;
80    /// # use std::time::Duration;
81    ///
82    /// # pub async fn example() -> Result<()> {
83    ///     let mut config = Config::default();
84    ///     config.bootstrap_servers = "127.0.0.1:9123".to_string();
85    ///     let conn = FlussConnection::new(config).await?;
86    ///
87    ///     let table_descriptor = TableDescriptor::builder()
88    ///         .schema(
89    ///             Schema::builder()
90    ///                 .column("col1", DataTypes::int())
91    ///                 .column("col2", DataTypes::string())
92    ///                 .column("col3", DataTypes::string())
93    ///                 .column("col4", DataTypes::string())
94    ///             .build()?,
95    ///         ).build()?;
96    ///     let table_path = TablePath::new("fluss".to_owned(), "rust_test_long".to_owned());
97    ///     let admin = conn.get_admin()?;
98    ///     admin.create_table(&table_path, &table_descriptor, true)
99    ///         .await?;
100    ///     let table_info = admin.get_table_info(&table_path).await?;
101    ///     let table = conn.get_table(&table_path).await?;
102    ///
103    ///     // Project columns by indices
104    ///     let scanner = table.new_scan().project(&[0, 2, 3])?.create_log_scanner()?;
105    ///     let scan_records = scanner.poll(Duration::from_secs(10)).await?;
106    ///     for record in scan_records {
107    ///         let row = record.row();
108    ///         println!(
109    ///             "{{{}, {}, {}}}@{}",
110    ///             row.get_int(0)?,
111    ///             row.get_string(2)?,
112    ///             row.get_string(3)?,
113    ///             record.offset()
114    ///         );
115    ///     }
116    ///     # Ok(())
117    /// # }
118    /// ```
119    pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
120        if column_indices.is_empty() {
121            return Err(Error::IllegalArgument {
122                message: "Column indices cannot be empty".to_string(),
123            });
124        }
125        let field_count = self.table_info.row_type().fields().len();
126        for &idx in column_indices {
127            if idx >= field_count {
128                return Err(Error::IllegalArgument {
129                    message: format!(
130                        "Column index {} out of range (max: {})",
131                        idx,
132                        field_count - 1
133                    ),
134                });
135            }
136        }
137        self.projected_fields = Some(column_indices.to_vec());
138        Ok(self)
139    }
140
141    /// Projects the scan to only include specified columns by their names.
142    ///
143    /// # Arguments
144    /// * `column_names` - Names of columns to include in the scan
145    ///
146    /// # Errors
147    /// Returns an error if `column_names` is empty or if any column name is not found in the table schema.
148    ///
149    /// # Example
150    /// ```
151    /// # use fluss::client::FlussConnection;
152    /// # use fluss::config::Config;
153    /// # use fluss::error::Result;
154    /// # use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
155    /// # use fluss::row::InternalRow;
156    /// # use std::time::Duration;
157    ///
158    /// # pub async fn example() -> Result<()> {
159    ///     let mut config = Config::default();
160    ///     config.bootstrap_servers = "127.0.0.1:9123".to_string();
161    ///     let conn = FlussConnection::new(config).await?;
162    ///
163    ///     let table_descriptor = TableDescriptor::builder()
164    ///         .schema(
165    ///             Schema::builder()
166    ///                 .column("col1", DataTypes::int())
167    ///                 .column("col2", DataTypes::string())
168    ///                 .column("col3", DataTypes::string())
169    ///             .build()?,
170    ///         ).build()?;
171    ///     let table_path = TablePath::new("fluss".to_owned(), "rust_test_long".to_owned());
172    ///     let admin = conn.get_admin()?;
173    ///     admin.create_table(&table_path, &table_descriptor, true)
174    ///         .await?;
175    ///     let table = conn.get_table(&table_path).await?;
176    ///
177    ///     // Project columns by column names
178    ///     let scanner = table.new_scan().project_by_name(&["col1", "col3"])?.create_log_scanner()?;
179    ///     let scan_records = scanner.poll(Duration::from_secs(10)).await?;
180    ///     for record in scan_records {
181    ///         let row = record.row();
182    ///         println!(
183    ///             "{{{}, {}}}@{}",
184    ///             row.get_int(0)?,
185    ///             row.get_string(1)?,
186    ///             record.offset()
187    ///         );
188    ///     }
189    ///     # Ok(())
190    /// # }
191    /// ```
192    pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
193        if column_names.is_empty() {
194            return Err(Error::IllegalArgument {
195                message: "Column names cannot be empty".to_string(),
196            });
197        }
198        let row_type = self.table_info.row_type();
199        let mut indices = Vec::new();
200
201        for name in column_names {
202            let idx = row_type
203                .fields()
204                .iter()
205                .position(|f| f.name() == *name)
206                .ok_or_else(|| Error::IllegalArgument {
207                    message: format!("Column '{name}' not found"),
208                })?;
209            indices.push(idx);
210        }
211
212        self.projected_fields = Some(indices);
213        Ok(self)
214    }
215
216    pub fn create_log_scanner(self) -> Result<LogScanner> {
217        validate_scan_support(&self.table_info.table_path, &self.table_info)?;
218        let inner = LogScannerInner::new(
219            &self.table_info,
220            self.metadata.clone(),
221            self.conn.get_connections(),
222            self.conn.config(),
223            self.projected_fields,
224        )?;
225        Ok(LogScanner {
226            inner: Arc::new(inner),
227        })
228    }
229
230    pub fn create_record_batch_log_scanner(self) -> Result<RecordBatchLogScanner> {
231        validate_scan_support(&self.table_info.table_path, &self.table_info)?;
232        let inner = LogScannerInner::new(
233            &self.table_info,
234            self.metadata.clone(),
235            self.conn.get_connections(),
236            self.conn.config(),
237            self.projected_fields,
238        )?;
239        Ok(RecordBatchLogScanner {
240            inner: Arc::new(inner),
241        })
242    }
243}
244
245/// Scanner for reading log records one at a time with per-record metadata.
246///
247/// Use this scanner when you need access to individual record offsets and timestamps.
248/// For batch-level access, use [`RecordBatchLogScanner`] instead.
249pub struct LogScanner {
250    inner: Arc<LogScannerInner>,
251}
252
253/// Scanner for reading log data as Arrow RecordBatches.
254///
255/// More efficient than [`LogScanner`] for batch-level analytics where per-record
256/// metadata (offsets, timestamps) is not needed.
257pub struct RecordBatchLogScanner {
258    inner: Arc<LogScannerInner>,
259}
260
261/// Private shared implementation for both scanner types
262struct LogScannerInner {
263    table_path: TablePath,
264    table_id: TableId,
265    metadata: Arc<Metadata>,
266    log_scanner_status: Arc<LogScannerStatus>,
267    log_fetcher: LogFetcher,
268    is_partitioned_table: bool,
269}
270
271impl LogScannerInner {
272    fn new(
273        table_info: &TableInfo,
274        metadata: Arc<Metadata>,
275        connections: Arc<RpcClient>,
276        config: &crate::config::Config,
277        projected_fields: Option<Vec<usize>>,
278    ) -> Result<Self> {
279        let log_scanner_status = Arc::new(LogScannerStatus::new());
280        Ok(Self {
281            table_path: table_info.table_path.clone(),
282            table_id: table_info.table_id,
283            is_partitioned_table: table_info.is_partitioned(),
284            metadata: metadata.clone(),
285            log_scanner_status: log_scanner_status.clone(),
286            log_fetcher: LogFetcher::new(
287                table_info.clone(),
288                connections.clone(),
289                metadata.clone(),
290                log_scanner_status.clone(),
291                config,
292                projected_fields,
293            )?,
294        })
295    }
296
297    async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
298        let start = Instant::now();
299        let deadline = start + timeout;
300
301        loop {
302            // Try to collect fetches
303            let fetch_result = self.poll_for_fetches().await?;
304
305            if !fetch_result.is_empty() {
306                // We have data, send next round of fetches and return
307                // This enables pipelining while user processes the data
308                self.log_fetcher.send_fetches().await?;
309                return Ok(ScanRecords::new(fetch_result));
310            }
311
312            // No data available, check if we should wait
313            let now = Instant::now();
314            if now >= deadline {
315                // Timeout reached, return empty result
316                return Ok(ScanRecords::new(HashMap::new()));
317            }
318
319            // Wait for buffer to become non-empty with remaining time
320            let remaining = deadline - now;
321            let has_data = self
322                .log_fetcher
323                .log_fetch_buffer
324                .await_not_empty(remaining)
325                .await?;
326
327            if !has_data {
328                // Timeout while waiting
329                return Ok(ScanRecords::new(HashMap::new()));
330            }
331
332            // Buffer became non-empty, try again
333        }
334    }
335
336    async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
337        if self.is_partitioned_table {
338            return Err(Error::UnsupportedOperation {
339                message: "The table is a partitioned table, please use \"subscribe_partition\" to \
340                subscribe a partitioned bucket instead."
341                    .to_string(),
342            });
343        }
344        let table_bucket = TableBucket::new(self.table_id, bucket);
345        self.metadata
346            .check_and_update_table_metadata(from_ref(&self.table_path))
347            .await?;
348        self.log_scanner_status
349            .assign_scan_bucket(table_bucket, offset);
350        Ok(())
351    }
352
353    async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
354        if self.is_partitioned_table {
355            return Err(Error::UnsupportedOperation {
356                message:
357                    "The table is a partitioned table, please use \"subscribe_partition_buckets\" instead."
358                        .to_string(),
359            });
360        }
361
362        let mut scan_bucket_offsets = HashMap::new();
363        for (bucket_id, offset) in bucket_offsets {
364            let table_bucket = TableBucket::new(self.table_id, *bucket_id);
365            scan_bucket_offsets.insert(table_bucket, *offset);
366        }
367        self.do_subscribe_buckets(scan_bucket_offsets).await
368    }
369
370    async fn subscribe_partition(
371        &self,
372        partition_id: PartitionId,
373        bucket: i32,
374        offset: i64,
375    ) -> Result<()> {
376        if !self.is_partitioned_table {
377            return Err(Error::UnsupportedOperation {
378                message: "The table is not a partitioned table, please use \"subscribe\" to \
379                subscribe a non-partitioned bucket instead."
380                    .to_string(),
381            });
382        }
383        let table_bucket =
384            TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket);
385        self.metadata
386            .check_and_update_table_metadata(from_ref(&self.table_path))
387            .await?;
388        self.log_scanner_status
389            .assign_scan_bucket(table_bucket, offset);
390        Ok(())
391    }
392
393    async fn subscribe_partition_buckets(
394        &self,
395        partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>,
396    ) -> Result<()> {
397        if !self.is_partitioned_table {
398            return Err(UnsupportedOperation {
399                message: "The table is not a partitioned table, please use \"subscribe_buckets\" \
400                    to subscribe to non-partitioned buckets instead."
401                    .to_string(),
402            });
403        }
404
405        let mut scan_bucket_offsets = HashMap::new();
406        for (&(partition_id, bucket_id), &offset) in partition_bucket_offsets {
407            let table_bucket =
408                TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket_id);
409            scan_bucket_offsets.insert(table_bucket, offset);
410        }
411        self.do_subscribe_buckets(scan_bucket_offsets).await
412    }
413
414    async fn do_subscribe_buckets(&self, bucket_offsets: HashMap<TableBucket, i64>) -> Result<()> {
415        if bucket_offsets.is_empty() {
416            return Err(Error::UnexpectedError {
417                message: "Bucket offsets are empty.".to_string(),
418                source: None,
419            });
420        }
421
422        self.metadata
423            .check_and_update_table_metadata(from_ref(&self.table_path))
424            .await?;
425
426        self.log_scanner_status.assign_scan_buckets(bucket_offsets);
427        Ok(())
428    }
429
430    async fn unsubscribe(&self, bucket: i32) -> Result<()> {
431        if self.is_partitioned_table {
432            return Err(Error::UnsupportedOperation {
433                message:
434                    "The table is a partitioned table, please use \"unsubscribe_partition\" to \
435                    unsubscribe a partitioned bucket instead."
436                        .to_string(),
437            });
438        }
439        let table_bucket = TableBucket::new(self.table_id, bucket);
440        self.log_scanner_status
441            .unassign_scan_buckets(from_ref(&table_bucket));
442        Ok(())
443    }
444
445    async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> {
446        if !self.is_partitioned_table {
447            return Err(Error::UnsupportedOperation {
448                message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(),
449            });
450        }
451        let table_bucket =
452            TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket);
453        self.log_scanner_status
454            .unassign_scan_buckets(from_ref(&table_bucket));
455        Ok(())
456    }
457
458    async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
459        let result = self.log_fetcher.collect_fetches()?;
460        if !result.is_empty() {
461            return Ok(result);
462        }
463
464        // send any new fetches (won't resend pending fetches).
465        self.log_fetcher.send_fetches().await?;
466
467        // Collect completed fetches from buffer
468        self.log_fetcher.collect_fetches()
469    }
470
471    async fn poll_batches(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
472        let start = Instant::now();
473        let deadline = start + timeout;
474
475        loop {
476            let batches = self.poll_for_batches().await?;
477
478            if !batches.is_empty() {
479                self.log_fetcher.send_fetches().await?;
480                return Ok(batches);
481            }
482
483            let now = Instant::now();
484            if now >= deadline {
485                return Ok(Vec::new());
486            }
487
488            let remaining = deadline - now;
489            let has_data = self
490                .log_fetcher
491                .log_fetch_buffer
492                .await_not_empty(remaining)
493                .await?;
494
495            if !has_data {
496                return Ok(Vec::new());
497            }
498        }
499    }
500
501    async fn poll_for_batches(&self) -> Result<Vec<ScanBatch>> {
502        let result = self.log_fetcher.collect_batches()?;
503        if !result.is_empty() {
504            return Ok(result);
505        }
506
507        self.log_fetcher.send_fetches().await?;
508        self.log_fetcher.collect_batches()
509    }
510}
511
512// Implementation for LogScanner (records mode)
513impl LogScanner {
514    pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
515        self.inner.poll_records(timeout).await
516    }
517
518    pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
519        self.inner.subscribe(bucket, offset).await
520    }
521
522    pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
523        self.inner.subscribe_buckets(bucket_offsets).await
524    }
525
526    pub async fn subscribe_partition(
527        &self,
528        partition_id: PartitionId,
529        bucket: i32,
530        offset: i64,
531    ) -> Result<()> {
532        self.inner
533            .subscribe_partition(partition_id, bucket, offset)
534            .await
535    }
536
537    pub async fn subscribe_partition_buckets(
538        &self,
539        partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>,
540    ) -> Result<()> {
541        self.inner
542            .subscribe_partition_buckets(partition_bucket_offsets)
543            .await
544    }
545
546    pub async fn unsubscribe(&self, bucket: i32) -> Result<()> {
547        self.inner.unsubscribe(bucket).await
548    }
549
550    pub async fn unsubscribe_partition(
551        &self,
552        partition_id: PartitionId,
553        bucket: i32,
554    ) -> Result<()> {
555        self.inner.unsubscribe_partition(partition_id, bucket).await
556    }
557}
558
559// Implementation for RecordBatchLogScanner (batches mode)
560impl RecordBatchLogScanner {
561    /// Poll for batches with metadata (bucket and offset information).
562    pub async fn poll(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
563        self.inner.poll_batches(timeout).await
564    }
565
566    pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
567        self.inner.subscribe(bucket, offset).await
568    }
569
570    pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
571        self.inner.subscribe_buckets(bucket_offsets).await
572    }
573
574    pub async fn subscribe_partition(
575        &self,
576        partition_id: PartitionId,
577        bucket: i32,
578        offset: i64,
579    ) -> Result<()> {
580        self.inner
581            .subscribe_partition(partition_id, bucket, offset)
582            .await
583    }
584
585    /// Returns whether the table is partitioned
586    pub fn is_partitioned(&self) -> bool {
587        self.inner.is_partitioned_table
588    }
589
590    /// Returns all subscribed buckets with their current offsets
591    pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> {
592        self.inner.log_scanner_status.get_all_subscriptions()
593    }
594
595    pub async fn subscribe_partition_buckets(
596        &self,
597        partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>,
598    ) -> Result<()> {
599        self.inner
600            .subscribe_partition_buckets(partition_bucket_offsets)
601            .await
602    }
603
604    pub async fn unsubscribe(&self, bucket: i32) -> Result<()> {
605        self.inner.unsubscribe(bucket).await
606    }
607
608    pub async fn unsubscribe_partition(
609        &self,
610        partition_id: PartitionId,
611        bucket: i32,
612    ) -> Result<()> {
613        self.inner.unsubscribe_partition(partition_id, bucket).await
614    }
615}
616
617struct LogFetcher {
618    conns: Arc<RpcClient>,
619    metadata: Arc<Metadata>,
620    table_path: TablePath,
621    is_partitioned: bool,
622    log_scanner_status: Arc<LogScannerStatus>,
623    read_context: ReadContext,
624    remote_read_context: ReadContext,
625    remote_log_downloader: Arc<RemoteLogDownloader>,
626    /// Background security token manager for remote filesystem access.
627    /// Kept alive to run the background refresh task; stopped on drop.
628    #[allow(dead_code)]
629    security_token_manager: Arc<SecurityTokenManager>,
630    log_fetch_buffer: Arc<LogFetchBuffer>,
631    nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
632    max_poll_records: usize,
633    fetch_max_bytes: i32,
634    fetch_min_bytes: i32,
635    fetch_wait_max_time_ms: i32,
636    fetch_max_bytes_for_bucket: i32,
637}
638
639struct FetchResponseContext {
640    metadata: Arc<Metadata>,
641    log_fetch_buffer: Arc<LogFetchBuffer>,
642    log_scanner_status: Arc<LogScannerStatus>,
643    read_context: ReadContext,
644    remote_read_context: ReadContext,
645    remote_log_downloader: Arc<RemoteLogDownloader>,
646}
647
648impl LogFetcher {
649    pub fn new(
650        table_info: TableInfo,
651        conns: Arc<RpcClient>,
652        metadata: Arc<Metadata>,
653        log_scanner_status: Arc<LogScannerStatus>,
654        config: &crate::config::Config,
655        projected_fields: Option<Vec<usize>>,
656    ) -> Result<Self> {
657        let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?;
658        let read_context =
659            Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false)?;
660        let remote_read_context =
661            Self::create_read_context(full_arrow_schema, projected_fields.clone(), true)?;
662
663        let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
664        let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone()));
665
666        // Create security token manager for background token refresh
667        let security_token_manager =
668            Arc::new(SecurityTokenManager::new(conns.clone(), metadata.clone()));
669
670        // Subscribe to credentials updates and pass to remote log downloader
671        let credentials_rx = security_token_manager.subscribe();
672
673        let remote_log_downloader = Arc::new(RemoteLogDownloader::new(
674            tmp_dir,
675            config.scanner_remote_log_prefetch_num,
676            config.remote_file_download_thread_num,
677            config.scanner_remote_log_read_concurrency,
678            credentials_rx,
679        )?);
680
681        // Start the background token refresh task
682        security_token_manager.start();
683
684        Ok(LogFetcher {
685            conns: conns.clone(),
686            metadata: metadata.clone(),
687            table_path: table_info.table_path.clone(),
688            is_partitioned: table_info.is_partitioned(),
689            log_scanner_status,
690            read_context,
691            remote_read_context,
692            remote_log_downloader,
693            security_token_manager,
694            log_fetch_buffer,
695            nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())),
696            max_poll_records: config.scanner_log_max_poll_records,
697            fetch_max_bytes: config.scanner_log_fetch_max_bytes,
698            fetch_min_bytes: config.scanner_log_fetch_min_bytes,
699            fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms,
700            fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket,
701        })
702    }
703
704    fn create_read_context(
705        full_arrow_schema: SchemaRef,
706        projected_fields: Option<Vec<usize>>,
707        is_from_remote: bool,
708    ) -> Result<ReadContext> {
709        match projected_fields {
710            None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)),
711            Some(fields) => {
712                ReadContext::with_projection_pushdown(full_arrow_schema, fields, is_from_remote)
713            }
714        }
715    }
716
717    fn describe_fetch_error(
718        error: FlussError,
719        table_bucket: &TableBucket,
720        fetch_offset: i64,
721        error_message: &str,
722    ) -> FetchErrorContext {
723        match error {
724            FlussError::NotLeaderOrFollower
725            | FlussError::LogStorageException
726            | FlussError::KvStorageException
727            | FlussError::StorageException
728            | FlussError::FencedLeaderEpochException
729            | FlussError::LeaderNotAvailableException => FetchErrorContext {
730                action: FetchErrorAction::Ignore,
731                log_level: FetchErrorLogLevel::Debug,
732                log_message: format!(
733                    "Error in fetch for bucket {table_bucket}: {error:?}: {error_message}"
734                ),
735            },
736            FlussError::UnknownTableOrBucketException => FetchErrorContext {
737                action: FetchErrorAction::Ignore,
738                log_level: FetchErrorLogLevel::Warn,
739                log_message: format!(
740                    "Received unknown table or bucket error in fetch for bucket {table_bucket}"
741                ),
742            },
743            FlussError::LogOffsetOutOfRangeException => FetchErrorContext {
744                action: FetchErrorAction::LogOffsetOutOfRange,
745                log_level: FetchErrorLogLevel::Debug,
746                log_message: format!(
747                    "The fetching offset {fetch_offset} is out of range for bucket {table_bucket}: {error_message}"
748                ),
749            },
750            FlussError::AuthorizationException => FetchErrorContext {
751                action: FetchErrorAction::Authorization,
752                log_level: FetchErrorLogLevel::Debug,
753                log_message: format!(
754                    "Authorization error while fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}"
755                ),
756            },
757            FlussError::UnknownServerError => FetchErrorContext {
758                action: FetchErrorAction::Ignore,
759                log_level: FetchErrorLogLevel::Warn,
760                log_message: format!(
761                    "Unknown server error while fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}"
762                ),
763            },
764            FlussError::CorruptMessage => FetchErrorContext {
765                action: FetchErrorAction::CorruptMessage,
766                log_level: FetchErrorLogLevel::Debug,
767                log_message: format!(
768                    "Encountered corrupt message when fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}"
769                ),
770            },
771            _ => FetchErrorContext {
772                action: FetchErrorAction::Unexpected,
773                log_level: FetchErrorLogLevel::Debug,
774                log_message: format!(
775                    "Unexpected error code {error:?} while fetching at offset {fetch_offset} from bucket {table_bucket}: {error_message}"
776                ),
777            },
778        }
779    }
780
781    fn should_invalidate_table_meta(error: FlussError) -> bool {
782        matches!(
783            error,
784            FlussError::NotLeaderOrFollower
785                | FlussError::LeaderNotAvailableException
786                | FlussError::FencedLeaderEpochException
787                | FlussError::UnknownTableOrBucketException
788                | FlussError::InvalidCoordinatorException
789        )
790    }
791
792    async fn check_and_update_metadata(&self, table_buckets: &[TableBucket]) -> Result<()> {
793        let mut partition_ids = Vec::new();
794        let mut need_update = false;
795
796        for tb in table_buckets {
797            if self.get_table_bucket_leader(tb).is_some() {
798                continue;
799            }
800
801            if self.is_partitioned {
802                partition_ids.push(tb.partition_id().unwrap());
803            } else {
804                need_update = true;
805                break;
806            }
807        }
808
809        let update_result = if self.is_partitioned && !partition_ids.is_empty() {
810            self.metadata
811                .update_tables_metadata(
812                    &HashSet::from([&self.table_path]),
813                    &HashSet::new(),
814                    partition_ids,
815                )
816                .await
817        } else if need_update {
818            self.metadata.update_table_metadata(&self.table_path).await
819        } else {
820            Ok(())
821        };
822
823        // TODO: Handle PartitionNotExist error like java side
824        update_result.or_else(|e| {
825            if let Error::RpcError { source, .. } = &e
826                && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_))
827            {
828                warn!("Retrying after encountering error while updating table metadata: {e}");
829                Ok(())
830            } else {
831                Err(e)
832            }
833        })?;
834        Ok(())
835    }
836
837    /// Send fetch requests asynchronously without waiting for responses
838    async fn send_fetches(&self) -> Result<()> {
839        self.check_and_update_metadata(self.fetchable_buckets().as_slice())
840            .await?;
841        let fetch_request = self.prepare_fetch_log_requests().await;
842
843        for (leader, fetch_request) in fetch_request {
844            debug!("Adding pending request for node id {leader}");
845            // Check if we already have a pending request for this node
846            {
847                self.nodes_with_pending_fetch_requests.lock().insert(leader);
848            }
849
850            let cluster = self.metadata.get_cluster().clone();
851
852            let conns = Arc::clone(&self.conns);
853            let log_fetch_buffer = self.log_fetch_buffer.clone();
854            let log_scanner_status = self.log_scanner_status.clone();
855            let read_context = self.read_context.clone();
856            let remote_read_context = self.remote_read_context.clone();
857            let remote_log_downloader = Arc::clone(&self.remote_log_downloader);
858            let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone();
859            let metadata = self.metadata.clone();
860            let response_context = FetchResponseContext {
861                metadata: metadata.clone(),
862                log_fetch_buffer,
863                log_scanner_status,
864                read_context,
865                remote_read_context,
866                remote_log_downloader,
867            };
868            // Spawn async task to handle the fetch request
869            // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped.
870            // This is acceptable because:
871            // 1. Tasks will naturally complete (network requests will return or timeout)
872            // 2. Tasks use Arc references, so resources are properly shared
873            // 3. When the program exits, tokio runtime will clean up all tasks
874            // 4. Tasks are short-lived (network I/O operations)
875            tokio::spawn(async move {
876                // make sure it will always remove leader from pending nodes
877                let _guard = scopeguard::guard((), |_| {
878                    nodes_with_pending.lock().remove(&leader);
879                });
880
881                let server_node = match cluster.get_tablet_server(leader) {
882                    Some(node) => node,
883                    None => {
884                        warn!("No server node found for leader {leader}, retrying");
885                        Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
886                        return;
887                    }
888                };
889
890                let con = match conns.get_connection(server_node).await {
891                    Ok(con) => con,
892                    Err(e) => {
893                        warn!("Retrying after error getting connection to destination node: {e:?}");
894                        Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
895                        return;
896                    }
897                };
898
899                let fetch_response = match con
900                    .request(message::FetchLogRequest::new(fetch_request.clone()))
901                    .await
902                {
903                    Ok(resp) => resp,
904                    Err(e) => {
905                        warn!(
906                            "Retrying after error fetching log from destination node {server_node:?}: {e:?}"
907                        );
908                        Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
909                        return;
910                    }
911                };
912
913                Self::handle_fetch_response(fetch_response, response_context).await;
914            });
915        }
916
917        Ok(())
918    }
919
920    async fn handle_fetch_failure(
921        metadata: Arc<Metadata>,
922        server_id: &i32,
923        request: &FetchLogRequest,
924    ) {
925        let table_ids = request.tables_req.iter().map(|r| r.table_id).collect();
926        metadata.invalidate_server(server_id, table_ids);
927    }
928
929    /// Handle fetch response and add completed fetches to buffer
930    async fn handle_fetch_response(
931        fetch_response: crate::proto::FetchLogResponse,
932        context: FetchResponseContext,
933    ) {
934        let FetchResponseContext {
935            metadata,
936            log_fetch_buffer,
937            log_scanner_status,
938            read_context,
939            remote_read_context,
940            remote_log_downloader,
941        } = context;
942
943        for pb_fetch_log_resp in fetch_response.tables_resp {
944            let table_id = pb_fetch_log_resp.table_id;
945            let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
946
947            for fetch_log_for_bucket in fetch_log_for_buckets {
948                let bucket: i32 = fetch_log_for_bucket.bucket_id;
949                let table_bucket = TableBucket::new_with_partition(
950                    table_id,
951                    fetch_log_for_bucket.partition_id,
952                    bucket,
953                );
954
955                // todo: check fetch result code for per-bucket
956                let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) else {
957                    debug!(
958                        "Ignoring fetch log response for bucket {table_bucket} because the bucket has been unsubscribed."
959                    );
960                    continue;
961                };
962
963                if let Some(error_code) = fetch_log_for_bucket.error_code
964                    && error_code != FlussError::None.code()
965                {
966                    let api_error: ApiError = ErrorResponse {
967                        error_code,
968                        error_message: fetch_log_for_bucket.error_message.clone(),
969                    }
970                    .into();
971
972                    let error = FlussError::for_code(error_code);
973                    if Self::should_invalidate_table_meta(error) {
974                        // TODO: Consider triggering table meta invalidation from sender/lookup paths.
975                        let table_id = table_bucket.table_id();
976                        let cluster = metadata.get_cluster();
977                        if let Some(table_path) = cluster.get_table_path_by_id(table_id) {
978                            let physical_tables = HashSet::from([PhysicalTablePath::of(Arc::new(
979                                table_path.clone(),
980                            ))]);
981                            metadata.invalidate_physical_table_meta(&physical_tables);
982                        } else {
983                            warn!(
984                                "Table id {table_id} is missing from table_path_by_id while invalidating table metadata"
985                            );
986                        }
987                    }
988                    let error_context = Self::describe_fetch_error(
989                        error,
990                        &table_bucket,
991                        fetch_offset,
992                        api_error.message.as_str(),
993                    );
994                    log_scanner_status.move_bucket_to_end(table_bucket.clone());
995                    match error_context.log_level {
996                        FetchErrorLogLevel::Debug => {
997                            debug!("{}", error_context.log_message);
998                        }
999                        FetchErrorLogLevel::Warn => {
1000                            warn!("{}", error_context.log_message);
1001                        }
1002                    }
1003                    log_fetch_buffer.add_api_error(
1004                        table_bucket.clone(),
1005                        api_error,
1006                        error_context,
1007                        fetch_offset,
1008                    );
1009                    continue;
1010                }
1011
1012                // Check if this is a remote log fetch
1013                if let Some(ref remote_log_fetch_info) = fetch_log_for_bucket.remote_log_fetch_info
1014                {
1015                    // Remote fs props are already set by the background SecurityTokenManager
1016                    let remote_fetch_info =
1017                        RemoteLogFetchInfo::from_proto(remote_log_fetch_info, table_bucket.clone());
1018
1019                    let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1);
1020                    Self::pending_remote_fetches(
1021                        remote_log_downloader.clone(),
1022                        log_fetch_buffer.clone(),
1023                        remote_read_context.clone(),
1024                        &table_bucket,
1025                        remote_fetch_info,
1026                        fetch_offset,
1027                        high_watermark,
1028                    );
1029                } else if fetch_log_for_bucket.records.is_some() {
1030                    // Handle regular in-memory records - create completed fetch directly
1031                    let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1);
1032                    let records = fetch_log_for_bucket.records.unwrap_or(vec![]);
1033                    let size_in_bytes = records.len();
1034                    let log_record_batch = LogRecordsBatches::new(records);
1035
1036                    let completed_fetch = DefaultCompletedFetch::new(
1037                        table_bucket.clone(),
1038                        log_record_batch,
1039                        size_in_bytes,
1040                        read_context.clone(),
1041                        fetch_offset,
1042                        high_watermark,
1043                    );
1044                    log_fetch_buffer.add(Box::new(completed_fetch));
1045                }
1046            }
1047        }
1048    }
1049
1050    fn pending_remote_fetches(
1051        remote_log_downloader: Arc<RemoteLogDownloader>,
1052        log_fetch_buffer: Arc<LogFetchBuffer>,
1053        read_context: ReadContext,
1054        table_bucket: &TableBucket,
1055        remote_fetch_info: RemoteLogFetchInfo,
1056        fetch_offset: i64,
1057        high_watermark: i64,
1058    ) {
1059        // Download and process remote log segments
1060        let mut pos_in_log_segment = remote_fetch_info.first_start_pos;
1061        let mut current_fetch_offset = fetch_offset;
1062        for (i, segment) in remote_fetch_info.remote_log_segments.iter().enumerate() {
1063            if i > 0 {
1064                pos_in_log_segment = 0;
1065                current_fetch_offset = segment.start_offset;
1066            }
1067
1068            // todo:
1069            // 1: control the max threads to download remote segment
1070            // 2: introduce priority queue to priority highest for earliest segment
1071            let download_future = remote_log_downloader
1072                .request_remote_log(&remote_fetch_info.remote_log_tablet_dir, segment);
1073
1074            // Register callback to be called when download completes
1075            // (similar to Java's downloadFuture.onComplete)
1076            // This must be done before creating RemotePendingFetch to avoid move issues
1077            let table_bucket = table_bucket.clone();
1078            let log_fetch_buffer_clone = log_fetch_buffer.clone();
1079            download_future.on_complete(move || {
1080                log_fetch_buffer_clone.try_complete(&table_bucket);
1081            });
1082
1083            let pending_fetch = RemotePendingFetch::new(
1084                segment.clone(),
1085                download_future,
1086                pos_in_log_segment,
1087                current_fetch_offset,
1088                high_watermark,
1089                read_context.clone(),
1090            );
1091            // Add to pending fetches in buffer (similar to Java's logFetchBuffer.pend)
1092            log_fetch_buffer.pend(Box::new(pending_fetch));
1093        }
1094    }
1095
1096    /// Collect completed fetches from buffer
1097    /// Reference: LogFetchCollector.collectFetch in Java
1098    fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
1099        let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
1100        let mut records_remaining = self.max_poll_records;
1101
1102        let collect_result: Result<()> = {
1103            while records_remaining > 0 {
1104                // Get the next in line fetch, or get a new one from buffer
1105                let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
1106
1107                if next_in_line.is_none() || next_in_line.as_ref().unwrap().is_consumed() {
1108                    // Get a new fetch from buffer
1109                    if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
1110                        // Initialize the fetch if not already initialized
1111                        if !completed_fetch.is_initialized() {
1112                            let size_in_bytes = completed_fetch.size_in_bytes();
1113                            match self.initialize_fetch(completed_fetch) {
1114                                Ok(initialized) => {
1115                                    self.log_fetch_buffer.set_next_in_line_fetch(initialized);
1116                                    continue;
1117                                }
1118                                Err(e) => {
1119                                    // Remove a completedFetch upon a parse with exception if
1120                                    // (1) it contains no records, and
1121                                    // (2) there are no fetched records with actual content preceding this
1122                                    // exception.
1123                                    if result.is_empty() && size_in_bytes == 0 {
1124                                        // todo: do we need to consider it like java ?
1125                                        // self.log_fetch_buffer.poll();
1126                                    }
1127                                    return Err(e);
1128                                }
1129                            }
1130                        } else {
1131                            self.log_fetch_buffer
1132                                .set_next_in_line_fetch(Some(completed_fetch));
1133                        }
1134                        // Note: poll() already removed the fetch from buffer, so no need to call poll()
1135                    } else {
1136                        // No more fetches available
1137                        break;
1138                    }
1139                } else {
1140                    // Fetch records from next_in_line
1141                    if let Some(mut next_fetch) = next_in_line {
1142                        let records = match self
1143                            .fetch_records_from_fetch(&mut next_fetch, records_remaining)
1144                        {
1145                            Ok(records) => records,
1146                            Err(e) => {
1147                                if !next_fetch.is_consumed() {
1148                                    self.log_fetch_buffer
1149                                        .set_next_in_line_fetch(Some(next_fetch));
1150                                }
1151                                return Err(e);
1152                            }
1153                        };
1154
1155                        if !records.is_empty() {
1156                            let table_bucket = next_fetch.table_bucket().clone();
1157                            // Merge with existing records for this bucket
1158                            let existing = result.entry(table_bucket).or_default();
1159                            let records_count = records.len();
1160                            existing.extend(records);
1161
1162                            records_remaining = records_remaining.saturating_sub(records_count);
1163                        }
1164
1165                        // If the fetch is not fully consumed, put it back for the next round
1166                        if !next_fetch.is_consumed() {
1167                            self.log_fetch_buffer
1168                                .set_next_in_line_fetch(Some(next_fetch));
1169                        }
1170                        // If consumed, next_fetch will be dropped here (which is correct)
1171                    }
1172                }
1173            }
1174            Ok(())
1175        };
1176
1177        match collect_result {
1178            Ok(()) => Ok(result),
1179            Err(e) => {
1180                if result.is_empty() {
1181                    Err(e)
1182                } else {
1183                    Ok(result)
1184                }
1185            }
1186        }
1187    }
1188
1189    /// Initialize a completed fetch, checking offset match and updating high watermark
1190    fn initialize_fetch(
1191        &self,
1192        mut completed_fetch: Box<dyn CompletedFetch>,
1193    ) -> Result<Option<Box<dyn CompletedFetch>>> {
1194        if let Some(error) = completed_fetch.take_error() {
1195            return Err(error);
1196        }
1197
1198        let table_bucket = completed_fetch.table_bucket().clone();
1199        let fetch_offset = completed_fetch.next_fetch_offset();
1200
1201        if let Some(api_error) = completed_fetch.api_error() {
1202            let error = FlussError::for_code(api_error.code);
1203            let error_message = api_error.message.as_str();
1204            self.log_scanner_status
1205                .move_bucket_to_end(table_bucket.clone());
1206            let action = completed_fetch
1207                .fetch_error_context()
1208                .map(|context| context.action)
1209                .unwrap_or(FetchErrorAction::Unexpected);
1210            match action {
1211                FetchErrorAction::Ignore => {
1212                    return Ok(None);
1213                }
1214                FetchErrorAction::LogOffsetOutOfRange => {
1215                    return Err(Error::UnexpectedError {
1216                        message: format!(
1217                            "The fetching offset {fetch_offset} is out of range: {error_message}"
1218                        ),
1219                        source: None,
1220                    });
1221                }
1222                FetchErrorAction::Authorization => {
1223                    return Err(Error::FlussAPIError {
1224                        api_error: ApiError {
1225                            code: api_error.code,
1226                            message: api_error.message.to_string(),
1227                        },
1228                    });
1229                }
1230                FetchErrorAction::CorruptMessage => {
1231                    return Err(Error::UnexpectedError {
1232                        message: format!(
1233                            "Encountered corrupt message when fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}"
1234                        ),
1235                        source: None,
1236                    });
1237                }
1238                FetchErrorAction::Unexpected => {
1239                    return Err(Error::UnexpectedError {
1240                        message: format!(
1241                            "Unexpected error code {error:?} while fetching at offset {fetch_offset} from bucket {table_bucket}: {error_message}"
1242                        ),
1243                        source: None,
1244                    });
1245                }
1246            }
1247        }
1248
1249        // Check if bucket is still subscribed
1250        let Some(current_offset) = self.log_scanner_status.get_bucket_offset(&table_bucket) else {
1251            warn!(
1252                "Discarding stale fetch response for bucket {table_bucket:?} since the bucket has been unsubscribed"
1253            );
1254            return Ok(None);
1255        };
1256
1257        // Check if offset matches
1258        if fetch_offset != current_offset {
1259            warn!(
1260                "Discarding stale fetch response for bucket {table_bucket:?} since its offset {fetch_offset} does not match the expected offset {current_offset}"
1261            );
1262            return Ok(None);
1263        }
1264
1265        // Update high watermark
1266        let high_watermark = completed_fetch.high_watermark();
1267        if high_watermark >= 0 {
1268            self.log_scanner_status
1269                .update_high_watermark(&table_bucket, high_watermark);
1270        }
1271
1272        completed_fetch.set_initialized();
1273        Ok(Some(completed_fetch))
1274    }
1275
1276    /// Fetch records from a completed fetch, checking offset match
1277    fn fetch_records_from_fetch(
1278        &self,
1279        next_in_line_fetch: &mut Box<dyn CompletedFetch>,
1280        max_records: usize,
1281    ) -> Result<Vec<ScanRecord>> {
1282        let table_bucket = next_in_line_fetch.table_bucket().clone();
1283        let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket);
1284
1285        if current_offset.is_none() {
1286            warn!(
1287                "Ignoring fetched records for {table_bucket:?} since the bucket has been unsubscribed"
1288            );
1289            next_in_line_fetch.drain();
1290            return Ok(Vec::new());
1291        }
1292
1293        let current_offset = current_offset.unwrap();
1294        let fetch_offset = next_in_line_fetch.next_fetch_offset();
1295
1296        // Check if this fetch is next in line
1297        if fetch_offset == current_offset {
1298            let records = next_in_line_fetch.fetch_records(max_records)?;
1299            let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
1300
1301            if next_fetch_offset > current_offset {
1302                self.log_scanner_status
1303                    .update_offset(&table_bucket, next_fetch_offset);
1304            }
1305
1306            if next_in_line_fetch.is_consumed() && next_in_line_fetch.records_read() > 0 {
1307                self.log_scanner_status
1308                    .move_bucket_to_end(table_bucket.clone());
1309            }
1310
1311            Ok(records)
1312        } else {
1313            // These records aren't next in line, ignore them
1314            warn!(
1315                "Ignoring fetched records for {table_bucket:?} at offset {fetch_offset} since the current offset is {current_offset}"
1316            );
1317            next_in_line_fetch.drain();
1318            Ok(Vec::new())
1319        }
1320    }
1321
1322    /// Collect completed fetches as ScanBatches (with bucket and offset metadata)
1323    fn collect_batches(&self) -> Result<Vec<ScanBatch>> {
1324        // Limit memory usage with both batch count and byte size constraints.
1325        // Max 100 batches per poll, but also check total bytes (soft cap ~64MB).
1326        const MAX_BATCHES: usize = 100;
1327        const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
1328        let mut result: Vec<ScanBatch> = Vec::new();
1329        let mut batches_remaining = MAX_BATCHES;
1330        let mut bytes_consumed: usize = 0;
1331
1332        let collect_result: Result<()> = {
1333            while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
1334                let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
1335
1336                match next_in_line {
1337                    Some(mut next_fetch) if !next_fetch.is_consumed() => {
1338                        let scan_batches =
1339                            self.fetch_batches_from_fetch(&mut next_fetch, batches_remaining)?;
1340                        let batch_count = scan_batches.len();
1341
1342                        if !scan_batches.is_empty() {
1343                            // Track bytes consumed (soft cap - may exceed by one fetch)
1344                            let batch_bytes: usize = scan_batches
1345                                .iter()
1346                                .map(|sb| sb.batch().get_array_memory_size())
1347                                .sum();
1348                            bytes_consumed += batch_bytes;
1349
1350                            result.extend(scan_batches);
1351                            batches_remaining = batches_remaining.saturating_sub(batch_count);
1352                        }
1353
1354                        if !next_fetch.is_consumed() {
1355                            self.log_fetch_buffer
1356                                .set_next_in_line_fetch(Some(next_fetch));
1357                        }
1358                    }
1359                    _ => {
1360                        if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
1361                            if !completed_fetch.is_initialized() {
1362                                let size_in_bytes = completed_fetch.size_in_bytes();
1363                                match self.initialize_fetch(completed_fetch) {
1364                                    Ok(initialized) => {
1365                                        self.log_fetch_buffer.set_next_in_line_fetch(initialized);
1366                                        continue;
1367                                    }
1368                                    Err(e) => {
1369                                        if result.is_empty() && size_in_bytes == 0 {
1370                                            continue;
1371                                        }
1372                                        return Err(e);
1373                                    }
1374                                }
1375                            } else {
1376                                self.log_fetch_buffer
1377                                    .set_next_in_line_fetch(Some(completed_fetch));
1378                            }
1379                        } else {
1380                            break;
1381                        }
1382                    }
1383                }
1384            }
1385            Ok(())
1386        };
1387
1388        match collect_result {
1389            Ok(()) => Ok(result),
1390            Err(e) => {
1391                if result.is_empty() {
1392                    Err(e)
1393                } else {
1394                    Ok(result)
1395                }
1396            }
1397        }
1398    }
1399
1400    fn fetch_batches_from_fetch(
1401        &self,
1402        next_in_line_fetch: &mut Box<dyn CompletedFetch>,
1403        max_batches: usize,
1404    ) -> Result<Vec<ScanBatch>> {
1405        let table_bucket = next_in_line_fetch.table_bucket().clone();
1406        let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket);
1407
1408        if current_offset.is_none() {
1409            warn!(
1410                "Ignoring fetched batches for {table_bucket:?} since the bucket has been unsubscribed"
1411            );
1412            next_in_line_fetch.drain();
1413            return Ok(Vec::new());
1414        }
1415
1416        let current_offset = current_offset.unwrap();
1417        let fetch_offset = next_in_line_fetch.next_fetch_offset();
1418
1419        if fetch_offset == current_offset {
1420            let batches_with_offsets = next_in_line_fetch.fetch_batches(max_batches)?;
1421            let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
1422
1423            if next_fetch_offset > current_offset {
1424                self.log_scanner_status
1425                    .update_offset(&table_bucket, next_fetch_offset);
1426            }
1427
1428            // Convert to ScanBatch with bucket info
1429            Ok(batches_with_offsets
1430                .into_iter()
1431                .map(|(batch, base_offset)| {
1432                    ScanBatch::new(table_bucket.clone(), batch, base_offset)
1433                })
1434                .collect())
1435        } else {
1436            warn!(
1437                "Ignoring fetched batches for {table_bucket:?} at offset {fetch_offset} since the current offset is {current_offset}"
1438            );
1439            next_in_line_fetch.drain();
1440            Ok(Vec::new())
1441        }
1442    }
1443
1444    async fn prepare_fetch_log_requests(&self) -> HashMap<i32, FetchLogRequest> {
1445        let mut fetch_log_req_for_buckets = HashMap::new();
1446        let mut table_id = None;
1447        let mut ready_for_fetch_count = 0;
1448        for bucket in self.fetchable_buckets() {
1449            if table_id.is_none() {
1450                table_id = Some(bucket.table_id());
1451            }
1452
1453            let offset = match self.log_scanner_status.get_bucket_offset(&bucket) {
1454                Some(offset) => offset,
1455                None => {
1456                    debug!(
1457                        "Skipping fetch request for bucket {bucket} because the bucket has been unsubscribed."
1458                    );
1459                    continue;
1460                }
1461            };
1462
1463            match self.get_table_bucket_leader(&bucket) {
1464                None => {
1465                    log::trace!(
1466                        "Skipping fetch request for bucket {bucket} because leader is not available."
1467                    )
1468                }
1469                Some(leader) => {
1470                    if self
1471                        .nodes_with_pending_fetch_requests
1472                        .lock()
1473                        .contains(&leader)
1474                    {
1475                        log::trace!(
1476                            "Skipping fetch request for bucket {bucket} because previous request to server {leader} has not been processed."
1477                        )
1478                    } else {
1479                        let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
1480                            partition_id: bucket.partition_id(),
1481                            bucket_id: bucket.bucket_id(),
1482                            fetch_offset: offset,
1483                            max_fetch_bytes: self.fetch_max_bytes_for_bucket,
1484                        };
1485
1486                        fetch_log_req_for_buckets
1487                            .entry(leader)
1488                            .or_insert_with(Vec::new)
1489                            .push(fetch_log_req_for_bucket);
1490                        ready_for_fetch_count += 1;
1491                    }
1492                }
1493            }
1494        }
1495
1496        if ready_for_fetch_count == 0 {
1497            HashMap::new()
1498        } else {
1499            let (projection_enabled, projected_fields) =
1500                match self.read_context.project_fields_in_order() {
1501                    None => (false, vec![]),
1502                    Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()),
1503                };
1504
1505            fetch_log_req_for_buckets
1506                .into_iter()
1507                .map(|(leader_id, feq_for_buckets)| {
1508                    let req_for_table = PbFetchLogReqForTable {
1509                        table_id: table_id.unwrap(),
1510                        projection_pushdown_enabled: projection_enabled,
1511                        projected_fields: projected_fields.clone(),
1512                        buckets_req: feq_for_buckets,
1513                    };
1514
1515                    let fetch_log_request = FetchLogRequest {
1516                        follower_server_id: -1,
1517                        max_bytes: self.fetch_max_bytes,
1518                        tables_req: vec![req_for_table],
1519                        max_wait_ms: Some(self.fetch_wait_max_time_ms),
1520                        min_bytes: Some(self.fetch_min_bytes),
1521                    };
1522                    (leader_id, fetch_log_request)
1523                })
1524                .collect()
1525        }
1526    }
1527
1528    fn fetchable_buckets(&self) -> Vec<TableBucket> {
1529        // Get buckets that are not already in the buffer
1530        let buffered = self.log_fetch_buffer.buffered_buckets();
1531        let buffered_set: HashSet<TableBucket> = buffered.into_iter().collect();
1532        self.log_scanner_status
1533            .fetchable_buckets(|tb| !buffered_set.contains(tb))
1534    }
1535
1536    fn get_table_bucket_leader(&self, tb: &TableBucket) -> Option<i32> {
1537        let cluster = self.metadata.get_cluster();
1538        cluster.leader_for(tb).map(|leader| leader.id())
1539    }
1540}
1541
1542pub struct LogScannerStatus {
1543    bucket_status_map: Arc<RwLock<FairBucketStatusMap<BucketScanStatus>>>,
1544}
1545
1546#[allow(dead_code)]
1547impl LogScannerStatus {
1548    pub fn new() -> Self {
1549        Self {
1550            bucket_status_map: Arc::new(RwLock::new(FairBucketStatusMap::new())),
1551        }
1552    }
1553
1554    pub fn prepare_to_poll(&self) -> bool {
1555        let map = self.bucket_status_map.read();
1556        map.size() > 0
1557    }
1558
1559    pub fn move_bucket_to_end(&self, table_bucket: TableBucket) {
1560        let mut map = self.bucket_status_map.write();
1561        map.move_to_end(table_bucket);
1562    }
1563
1564    /// Gets the offset of a bucket if it exists
1565    pub fn get_bucket_offset(&self, table_bucket: &TableBucket) -> Option<i64> {
1566        let map = self.bucket_status_map.read();
1567        map.status_value(table_bucket).map(|status| status.offset())
1568    }
1569
1570    pub fn update_high_watermark(&self, table_bucket: &TableBucket, high_watermark: i64) {
1571        if let Some(status) = self.get_status(table_bucket) {
1572            status.set_high_watermark(high_watermark);
1573        }
1574    }
1575
1576    pub fn update_offset(&self, table_bucket: &TableBucket, offset: i64) {
1577        if let Some(status) = self.get_status(table_bucket) {
1578            status.set_offset(offset);
1579        }
1580    }
1581
1582    pub fn assign_scan_buckets(&self, scan_bucket_offsets: HashMap<TableBucket, i64>) {
1583        let mut map = self.bucket_status_map.write();
1584        for (bucket, offset) in scan_bucket_offsets {
1585            let status = map
1586                .status_value(&bucket)
1587                .cloned()
1588                .unwrap_or_else(|| Arc::new(BucketScanStatus::new(offset)));
1589            status.set_offset(offset);
1590            map.update(bucket, status);
1591        }
1592    }
1593
1594    pub fn assign_scan_bucket(&self, table_bucket: TableBucket, offset: i64) {
1595        let status = Arc::new(BucketScanStatus::new(offset));
1596        self.bucket_status_map.write().update(table_bucket, status);
1597    }
1598
1599    /// Unassigns scan buckets
1600    pub fn unassign_scan_buckets(&self, buckets: &[TableBucket]) {
1601        let mut map = self.bucket_status_map.write();
1602        for bucket in buckets {
1603            map.remove(bucket);
1604        }
1605    }
1606
1607    /// Gets fetchable buckets based on availability predicate
1608    pub fn fetchable_buckets<F>(&self, is_available: F) -> Vec<TableBucket>
1609    where
1610        F: Fn(&TableBucket) -> bool,
1611    {
1612        let map = self.bucket_status_map.read();
1613        let mut result = Vec::new();
1614        map.for_each(|bucket, _| {
1615            if is_available(bucket) {
1616                result.push(bucket.clone());
1617            }
1618        });
1619        result
1620    }
1621
1622    /// Returns all subscribed buckets with their current offsets
1623    pub fn get_all_subscriptions(&self) -> Vec<(TableBucket, i64)> {
1624        let map = self.bucket_status_map.read();
1625        let mut result = Vec::new();
1626        map.for_each(|bucket, status| {
1627            result.push((bucket.clone(), status.offset()));
1628        });
1629        result
1630    }
1631
1632    /// Helper to get bucket status
1633    fn get_status(&self, table_bucket: &TableBucket) -> Option<Arc<BucketScanStatus>> {
1634        let map = self.bucket_status_map.read();
1635        map.status_value(table_bucket).cloned()
1636    }
1637}
1638
1639impl Default for LogScannerStatus {
1640    fn default() -> Self {
1641        Self::new()
1642    }
1643}
1644
1645#[derive(Debug)]
1646#[allow(dead_code)]
1647pub struct BucketScanStatus {
1648    offset: RwLock<i64>,
1649    high_watermark: RwLock<i64>,
1650}
1651
1652#[allow(dead_code)]
1653impl BucketScanStatus {
1654    pub fn new(offset: i64) -> Self {
1655        Self {
1656            offset: RwLock::new(offset),
1657            high_watermark: RwLock::new(0),
1658        }
1659    }
1660
1661    pub fn offset(&self) -> i64 {
1662        *self.offset.read()
1663    }
1664
1665    pub fn set_offset(&self, offset: i64) {
1666        *self.offset.write() = offset
1667    }
1668
1669    pub fn high_watermark(&self) -> i64 {
1670        *self.high_watermark.read()
1671    }
1672
1673    pub fn set_high_watermark(&self, high_watermark: i64) {
1674        *self.high_watermark.write() = high_watermark
1675    }
1676}
1677
1678fn validate_scan_support(table_path: &TablePath, table_info: &TableInfo) -> Result<()> {
1679    if table_info.schema.primary_key().is_some() {
1680        return Err(UnsupportedOperation {
1681            message: format!("Table {table_path} is not a Log Table and doesn't support scan."),
1682        });
1683    }
1684
1685    let log_format = table_info.table_config.get_log_format()?;
1686    if LogFormat::ARROW != log_format {
1687        return Err(UnsupportedOperation {
1688            message: format!(
1689                "Scan is only supported for ARROW format and table {table_path} uses {log_format} format"
1690            ),
1691        });
1692    }
1693
1694    Ok(())
1695}
1696
1697#[cfg(test)]
1698mod tests {
1699    use super::*;
1700    use crate::client::WriteRecord;
1701    use crate::client::metadata::Metadata;
1702    use crate::compression::{
1703        ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1704    };
1705    use crate::metadata::{DataTypes, PhysicalTablePath, Schema, TableInfo, TablePath};
1706    use crate::record::MemoryLogRecordsArrowBuilder;
1707    use crate::row::{Datum, GenericRow};
1708    use crate::rpc::FlussError;
1709    use crate::test_utils::{build_cluster_arc, build_table_info};
1710
1711    fn build_records(table_info: &TableInfo, table_path: Arc<TablePath>) -> Result<Vec<u8>> {
1712        let mut builder = MemoryLogRecordsArrowBuilder::new(
1713            1,
1714            table_info.get_row_type(),
1715            false,
1716            ArrowCompressionInfo {
1717                compression_type: ArrowCompressionType::None,
1718                compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1719            },
1720        )?;
1721        let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
1722        let row = GenericRow {
1723            values: vec![Datum::Int32(1)],
1724        };
1725        let record =
1726            WriteRecord::for_append(Arc::new(table_info.clone()), physical_table_path, 1, &row);
1727        builder.append(&record)?;
1728        builder.build()
1729    }
1730
1731    #[tokio::test]
1732    async fn collect_fetches_updates_offset() -> Result<()> {
1733        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1734        let table_info = build_table_info(table_path.clone(), 1, 1);
1735        let cluster = build_cluster_arc(&table_path, 1, 1);
1736        let metadata = Arc::new(Metadata::new_for_test(cluster));
1737        let status = Arc::new(LogScannerStatus::new());
1738        let fetcher = LogFetcher::new(
1739            table_info.clone(),
1740            Arc::new(RpcClient::new()),
1741            metadata,
1742            status.clone(),
1743            &crate::config::Config::default(),
1744            None,
1745        )?;
1746
1747        let bucket = TableBucket::new(1, 0);
1748        status.assign_scan_bucket(bucket.clone(), 0);
1749
1750        let data = build_records(&table_info, Arc::new(table_path))?;
1751        let log_records = LogRecordsBatches::new(data.clone());
1752        let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
1753        let completed =
1754            DefaultCompletedFetch::new(bucket.clone(), log_records, data.len(), read_context, 0, 0);
1755        fetcher.log_fetch_buffer.add(Box::new(completed));
1756
1757        let fetched = fetcher.collect_fetches()?;
1758        assert_eq!(fetched.get(&bucket).unwrap().len(), 1);
1759        assert_eq!(status.get_bucket_offset(&bucket), Some(1));
1760        Ok(())
1761    }
1762
1763    #[tokio::test]
1764    async fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> {
1765        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1766        let table_info = build_table_info(table_path.clone(), 1, 1);
1767        let cluster = build_cluster_arc(&table_path, 1, 1);
1768        let metadata = Arc::new(Metadata::new_for_test(cluster));
1769        let status = Arc::new(LogScannerStatus::new());
1770        let fetcher = LogFetcher::new(
1771            table_info.clone(),
1772            Arc::new(RpcClient::new()),
1773            metadata,
1774            status,
1775            &crate::config::Config::default(),
1776            None,
1777        )?;
1778
1779        let bucket = TableBucket::new(1, 0);
1780        let data = build_records(&table_info, Arc::new(table_path))?;
1781        let log_records = LogRecordsBatches::new(data.clone());
1782        let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
1783        let mut completed: Box<dyn CompletedFetch> = Box::new(DefaultCompletedFetch::new(
1784            bucket,
1785            log_records,
1786            data.len(),
1787            read_context,
1788            0,
1789            0,
1790        ));
1791
1792        let records = fetcher.fetch_records_from_fetch(&mut completed, 10)?;
1793        assert!(records.is_empty());
1794        assert!(completed.is_consumed());
1795        Ok(())
1796    }
1797
1798    #[tokio::test]
1799    async fn prepare_fetch_log_requests_skips_pending() -> Result<()> {
1800        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1801        let table_info = build_table_info(table_path.clone(), 1, 1);
1802        let cluster = build_cluster_arc(&table_path, 1, 1);
1803        let metadata = Arc::new(Metadata::new_for_test(cluster));
1804        let status = Arc::new(LogScannerStatus::new());
1805        status.assign_scan_bucket(TableBucket::new(1, 0), 0);
1806        let fetcher = LogFetcher::new(
1807            table_info,
1808            Arc::new(RpcClient::new()),
1809            metadata,
1810            status,
1811            &crate::config::Config::default(),
1812            None,
1813        )?;
1814
1815        fetcher.nodes_with_pending_fetch_requests.lock().insert(1);
1816
1817        let requests = fetcher.prepare_fetch_log_requests().await;
1818        assert!(requests.is_empty());
1819        Ok(())
1820    }
1821
1822    #[tokio::test]
1823    async fn handle_fetch_response_sets_error() -> Result<()> {
1824        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1825        let table_info = build_table_info(table_path.clone(), 1, 1);
1826        let cluster = build_cluster_arc(&table_path, 1, 1);
1827        let metadata = Arc::new(Metadata::new_for_test(cluster));
1828        let status = Arc::new(LogScannerStatus::new());
1829        status.assign_scan_bucket(TableBucket::new(1, 0), 5);
1830        let fetcher = LogFetcher::new(
1831            table_info.clone(),
1832            Arc::new(RpcClient::new()),
1833            metadata.clone(),
1834            status.clone(),
1835            &crate::config::Config::default(),
1836            None,
1837        )?;
1838
1839        let response = crate::proto::FetchLogResponse {
1840            tables_resp: vec![crate::proto::PbFetchLogRespForTable {
1841                table_id: 1,
1842                buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
1843                    partition_id: None,
1844                    bucket_id: 0,
1845                    error_code: Some(FlussError::AuthorizationException.code()),
1846                    error_message: Some("denied".to_string()),
1847                    high_watermark: None,
1848                    log_start_offset: None,
1849                    remote_log_fetch_info: None,
1850                    records: None,
1851                }],
1852            }],
1853        };
1854
1855        let response_context = FetchResponseContext {
1856            metadata: metadata.clone(),
1857            log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
1858            log_scanner_status: fetcher.log_scanner_status.clone(),
1859            read_context: fetcher.read_context.clone(),
1860            remote_read_context: fetcher.remote_read_context.clone(),
1861            remote_log_downloader: fetcher.remote_log_downloader.clone(),
1862        };
1863
1864        LogFetcher::handle_fetch_response(response, response_context).await;
1865
1866        let completed = fetcher.log_fetch_buffer.poll().expect("completed fetch");
1867        let api_error = completed.api_error().expect("api error");
1868        assert_eq!(api_error.code, FlussError::AuthorizationException.code());
1869        Ok(())
1870    }
1871
1872    #[tokio::test]
1873    async fn handle_fetch_response_invalidates_table_meta() -> Result<()> {
1874        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1875        let table_info = build_table_info(table_path.clone(), 1, 1);
1876        let cluster = build_cluster_arc(&table_path, 1, 1);
1877        let metadata = Arc::new(Metadata::new_for_test(cluster.clone()));
1878        let status = Arc::new(LogScannerStatus::new());
1879        status.assign_scan_bucket(TableBucket::new(1, 0), 5);
1880        let fetcher = LogFetcher::new(
1881            table_info.clone(),
1882            Arc::new(RpcClient::new()),
1883            metadata.clone(),
1884            status.clone(),
1885            &crate::config::Config::default(),
1886            None,
1887        )?;
1888
1889        let bucket = TableBucket::new(1, 0);
1890        assert!(metadata.leader_for(&table_path, &bucket).await?.is_some());
1891
1892        let response = crate::proto::FetchLogResponse {
1893            tables_resp: vec![crate::proto::PbFetchLogRespForTable {
1894                table_id: 1,
1895                buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
1896                    partition_id: None,
1897                    bucket_id: 0,
1898                    error_code: Some(FlussError::NotLeaderOrFollower.code()),
1899                    error_message: Some("not leader".to_string()),
1900                    high_watermark: None,
1901                    log_start_offset: None,
1902                    remote_log_fetch_info: None,
1903                    records: None,
1904                }],
1905            }],
1906        };
1907
1908        let response_context = FetchResponseContext {
1909            metadata: metadata.clone(),
1910            log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
1911            log_scanner_status: fetcher.log_scanner_status.clone(),
1912            read_context: fetcher.read_context.clone(),
1913            remote_read_context: fetcher.remote_read_context.clone(),
1914            remote_log_downloader: fetcher.remote_log_downloader.clone(),
1915        };
1916
1917        LogFetcher::handle_fetch_response(response, response_context).await;
1918
1919        assert!(metadata.get_cluster().leader_for(&bucket).is_none());
1920        Ok(())
1921    }
1922
1923    fn create_test_table_info(
1924        has_primary_key: bool,
1925        log_format: Option<&str>,
1926    ) -> (TableInfo, TablePath) {
1927        let mut schema_builder = Schema::builder()
1928            .column("id", DataTypes::int())
1929            .column("name", DataTypes::string());
1930
1931        if has_primary_key {
1932            schema_builder = schema_builder.primary_key(vec!["id"]);
1933        }
1934
1935        let schema = schema_builder.build().unwrap();
1936        let table_path = TablePath::new("test_db", "test_table");
1937
1938        let mut properties = HashMap::new();
1939        if let Some(format) = log_format {
1940            properties.insert("table.log.format".to_string(), format.to_string());
1941        }
1942
1943        let table_info = TableInfo::new(
1944            table_path.clone(),
1945            1,
1946            1,
1947            schema,
1948            vec![],
1949            Arc::from(vec![]),
1950            1,
1951            properties,
1952            HashMap::new(),
1953            None,
1954            0,
1955            0,
1956        );
1957
1958        (table_info, table_path)
1959    }
1960
1961    #[test]
1962    fn test_validate_scan_support() {
1963        // Primary key table
1964        let (table_info, table_path) = create_test_table_info(true, Some("ARROW"));
1965        let result = validate_scan_support(&table_path, &table_info);
1966
1967        assert!(result.is_err());
1968        let err = result.unwrap_err();
1969        assert!(matches!(err, UnsupportedOperation { .. }));
1970        assert!(err.to_string().contains(
1971            format!("Table {table_path} is not a Log Table and doesn't support scan.").as_str()
1972        ));
1973
1974        // Indexed format
1975        let (table_info, table_path) = create_test_table_info(false, Some("INDEXED"));
1976        let result = validate_scan_support(&table_path, &table_info);
1977
1978        assert!(result.is_err());
1979        let err = result.unwrap_err();
1980        assert!(matches!(err, UnsupportedOperation { .. }));
1981        assert!(err.to_string().contains(format!("Scan is only supported for ARROW format and table {table_path} uses INDEXED format").as_str()));
1982
1983        // Default format
1984        let (table_info, table_path) = create_test_table_info(false, None);
1985        let result = validate_scan_support(&table_path, &table_info);
1986        assert!(result.is_ok());
1987
1988        // Arrow format
1989        let (table_info, table_path) = create_test_table_info(false, Some("ARROW"));
1990        let result = validate_scan_support(&table_path, &table_info);
1991        assert!(result.is_ok());
1992    }
1993    #[tokio::test]
1994    async fn prepare_fetch_log_requests_uses_configured_fetch_params() -> Result<()> {
1995        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1996        let table_info = build_table_info(table_path.clone(), 1, 1);
1997        let cluster = build_cluster_arc(&table_path, 1, 1);
1998        let metadata = Arc::new(Metadata::new_for_test(cluster));
1999        let status = Arc::new(LogScannerStatus::new());
2000        status.assign_scan_bucket(TableBucket::new(1, 0), 0);
2001
2002        let config = crate::config::Config {
2003            scanner_log_fetch_max_bytes: 1234,
2004            scanner_log_fetch_min_bytes: 7,
2005            scanner_log_fetch_wait_max_time_ms: 89,
2006            scanner_log_fetch_max_bytes_for_bucket: 512,
2007            ..crate::config::Config::default()
2008        };
2009
2010        let fetcher = LogFetcher::new(
2011            table_info,
2012            Arc::new(RpcClient::new()),
2013            metadata,
2014            status,
2015            &config,
2016            None,
2017        )?;
2018
2019        let requests = fetcher.prepare_fetch_log_requests().await;
2020        // In this test cluster, leader id should exist; but even if it changes,
2021        // assert over all built requests.
2022        assert!(!requests.is_empty());
2023        for req in requests.values() {
2024            assert_eq!(req.max_bytes, 1234);
2025            assert_eq!(req.min_bytes, Some(7));
2026            assert_eq!(req.max_wait_ms, Some(89));
2027
2028            for table_req in &req.tables_req {
2029                for bucket_req in &table_req.buckets_req {
2030                    assert_eq!(bucket_req.max_fetch_bytes, 512);
2031                }
2032            }
2033        }
2034        Ok(())
2035    }
2036}