1use crate::client::connection::FlussConnection;
19use crate::client::credentials::SecurityTokenManager;
20use crate::client::metadata::Metadata;
21use crate::client::table::log_fetch_buffer::{
22 CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel,
23 LogFetchBuffer, RemotePendingFetch,
24};
25use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo};
26use crate::error::Error::UnsupportedOperation;
27use crate::error::{ApiError, Error, FlussError, Result};
28use crate::metadata::{LogFormat, PhysicalTablePath, TableBucket, TableInfo, TablePath};
29use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable};
30use crate::record::{
31 LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords, to_arrow_schema,
32};
33use crate::rpc::{RpcClient, RpcError, message};
34use crate::util::FairBucketStatusMap;
35use crate::{PartitionId, TableId};
36use arrow_schema::SchemaRef;
37use log::{debug, warn};
38use parking_lot::{Mutex, RwLock};
39use std::{
40 collections::{HashMap, HashSet},
41 slice::from_ref,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tempfile::TempDir;
46
47pub struct TableScan<'a> {
48 conn: &'a FlussConnection,
49 table_info: TableInfo,
50 metadata: Arc<Metadata>,
51 projected_fields: Option<Vec<usize>>,
53}
54
55impl<'a> TableScan<'a> {
56 pub fn new(conn: &'a FlussConnection, table_info: TableInfo, metadata: Arc<Metadata>) -> Self {
57 Self {
58 conn,
59 table_info,
60 metadata,
61 projected_fields: None,
62 }
63 }
64
65 pub fn project(mut self, column_indices: &[usize]) -> Result<Self> {
120 if column_indices.is_empty() {
121 return Err(Error::IllegalArgument {
122 message: "Column indices cannot be empty".to_string(),
123 });
124 }
125 let field_count = self.table_info.row_type().fields().len();
126 for &idx in column_indices {
127 if idx >= field_count {
128 return Err(Error::IllegalArgument {
129 message: format!(
130 "Column index {} out of range (max: {})",
131 idx,
132 field_count - 1
133 ),
134 });
135 }
136 }
137 self.projected_fields = Some(column_indices.to_vec());
138 Ok(self)
139 }
140
141 pub fn project_by_name(mut self, column_names: &[&str]) -> Result<Self> {
193 if column_names.is_empty() {
194 return Err(Error::IllegalArgument {
195 message: "Column names cannot be empty".to_string(),
196 });
197 }
198 let row_type = self.table_info.row_type();
199 let mut indices = Vec::new();
200
201 for name in column_names {
202 let idx = row_type
203 .fields()
204 .iter()
205 .position(|f| f.name() == *name)
206 .ok_or_else(|| Error::IllegalArgument {
207 message: format!("Column '{name}' not found"),
208 })?;
209 indices.push(idx);
210 }
211
212 self.projected_fields = Some(indices);
213 Ok(self)
214 }
215
216 pub fn create_log_scanner(self) -> Result<LogScanner> {
217 validate_scan_support(&self.table_info.table_path, &self.table_info)?;
218 let inner = LogScannerInner::new(
219 &self.table_info,
220 self.metadata.clone(),
221 self.conn.get_connections(),
222 self.conn.config(),
223 self.projected_fields,
224 )?;
225 Ok(LogScanner {
226 inner: Arc::new(inner),
227 })
228 }
229
230 pub fn create_record_batch_log_scanner(self) -> Result<RecordBatchLogScanner> {
231 validate_scan_support(&self.table_info.table_path, &self.table_info)?;
232 let inner = LogScannerInner::new(
233 &self.table_info,
234 self.metadata.clone(),
235 self.conn.get_connections(),
236 self.conn.config(),
237 self.projected_fields,
238 )?;
239 Ok(RecordBatchLogScanner {
240 inner: Arc::new(inner),
241 })
242 }
243}
244
245pub struct LogScanner {
250 inner: Arc<LogScannerInner>,
251}
252
253pub struct RecordBatchLogScanner {
258 inner: Arc<LogScannerInner>,
259}
260
261struct LogScannerInner {
263 table_path: TablePath,
264 table_id: TableId,
265 metadata: Arc<Metadata>,
266 log_scanner_status: Arc<LogScannerStatus>,
267 log_fetcher: LogFetcher,
268 is_partitioned_table: bool,
269}
270
271impl LogScannerInner {
272 fn new(
273 table_info: &TableInfo,
274 metadata: Arc<Metadata>,
275 connections: Arc<RpcClient>,
276 config: &crate::config::Config,
277 projected_fields: Option<Vec<usize>>,
278 ) -> Result<Self> {
279 let log_scanner_status = Arc::new(LogScannerStatus::new());
280 Ok(Self {
281 table_path: table_info.table_path.clone(),
282 table_id: table_info.table_id,
283 is_partitioned_table: table_info.is_partitioned(),
284 metadata: metadata.clone(),
285 log_scanner_status: log_scanner_status.clone(),
286 log_fetcher: LogFetcher::new(
287 table_info.clone(),
288 connections.clone(),
289 metadata.clone(),
290 log_scanner_status.clone(),
291 config,
292 projected_fields,
293 )?,
294 })
295 }
296
297 async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
298 let start = Instant::now();
299 let deadline = start + timeout;
300
301 loop {
302 let fetch_result = self.poll_for_fetches().await?;
304
305 if !fetch_result.is_empty() {
306 self.log_fetcher.send_fetches().await?;
309 return Ok(ScanRecords::new(fetch_result));
310 }
311
312 let now = Instant::now();
314 if now >= deadline {
315 return Ok(ScanRecords::new(HashMap::new()));
317 }
318
319 let remaining = deadline - now;
321 let has_data = self
322 .log_fetcher
323 .log_fetch_buffer
324 .await_not_empty(remaining)
325 .await?;
326
327 if !has_data {
328 return Ok(ScanRecords::new(HashMap::new()));
330 }
331
332 }
334 }
335
336 async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
337 if self.is_partitioned_table {
338 return Err(Error::UnsupportedOperation {
339 message: "The table is a partitioned table, please use \"subscribe_partition\" to \
340 subscribe a partitioned bucket instead."
341 .to_string(),
342 });
343 }
344 let table_bucket = TableBucket::new(self.table_id, bucket);
345 self.metadata
346 .check_and_update_table_metadata(from_ref(&self.table_path))
347 .await?;
348 self.log_scanner_status
349 .assign_scan_bucket(table_bucket, offset);
350 Ok(())
351 }
352
353 async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
354 if self.is_partitioned_table {
355 return Err(Error::UnsupportedOperation {
356 message:
357 "The table is a partitioned table, please use \"subscribe_partition_buckets\" instead."
358 .to_string(),
359 });
360 }
361
362 let mut scan_bucket_offsets = HashMap::new();
363 for (bucket_id, offset) in bucket_offsets {
364 let table_bucket = TableBucket::new(self.table_id, *bucket_id);
365 scan_bucket_offsets.insert(table_bucket, *offset);
366 }
367 self.do_subscribe_buckets(scan_bucket_offsets).await
368 }
369
370 async fn subscribe_partition(
371 &self,
372 partition_id: PartitionId,
373 bucket: i32,
374 offset: i64,
375 ) -> Result<()> {
376 if !self.is_partitioned_table {
377 return Err(Error::UnsupportedOperation {
378 message: "The table is not a partitioned table, please use \"subscribe\" to \
379 subscribe a non-partitioned bucket instead."
380 .to_string(),
381 });
382 }
383 let table_bucket =
384 TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket);
385 self.metadata
386 .check_and_update_table_metadata(from_ref(&self.table_path))
387 .await?;
388 self.log_scanner_status
389 .assign_scan_bucket(table_bucket, offset);
390 Ok(())
391 }
392
393 async fn subscribe_partition_buckets(
394 &self,
395 partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>,
396 ) -> Result<()> {
397 if !self.is_partitioned_table {
398 return Err(UnsupportedOperation {
399 message: "The table is not a partitioned table, please use \"subscribe_buckets\" \
400 to subscribe to non-partitioned buckets instead."
401 .to_string(),
402 });
403 }
404
405 let mut scan_bucket_offsets = HashMap::new();
406 for (&(partition_id, bucket_id), &offset) in partition_bucket_offsets {
407 let table_bucket =
408 TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket_id);
409 scan_bucket_offsets.insert(table_bucket, offset);
410 }
411 self.do_subscribe_buckets(scan_bucket_offsets).await
412 }
413
414 async fn do_subscribe_buckets(&self, bucket_offsets: HashMap<TableBucket, i64>) -> Result<()> {
415 if bucket_offsets.is_empty() {
416 return Err(Error::UnexpectedError {
417 message: "Bucket offsets are empty.".to_string(),
418 source: None,
419 });
420 }
421
422 self.metadata
423 .check_and_update_table_metadata(from_ref(&self.table_path))
424 .await?;
425
426 self.log_scanner_status.assign_scan_buckets(bucket_offsets);
427 Ok(())
428 }
429
430 async fn unsubscribe(&self, bucket: i32) -> Result<()> {
431 if self.is_partitioned_table {
432 return Err(Error::UnsupportedOperation {
433 message:
434 "The table is a partitioned table, please use \"unsubscribe_partition\" to \
435 unsubscribe a partitioned bucket instead."
436 .to_string(),
437 });
438 }
439 let table_bucket = TableBucket::new(self.table_id, bucket);
440 self.log_scanner_status
441 .unassign_scan_buckets(from_ref(&table_bucket));
442 Ok(())
443 }
444
445 async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: i32) -> Result<()> {
446 if !self.is_partitioned_table {
447 return Err(Error::UnsupportedOperation {
448 message: "Can't unsubscribe a partition for a non-partitioned table.".to_string(),
449 });
450 }
451 let table_bucket =
452 TableBucket::new_with_partition(self.table_id, Some(partition_id), bucket);
453 self.log_scanner_status
454 .unassign_scan_buckets(from_ref(&table_bucket));
455 Ok(())
456 }
457
458 async fn poll_for_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
459 let result = self.log_fetcher.collect_fetches()?;
460 if !result.is_empty() {
461 return Ok(result);
462 }
463
464 self.log_fetcher.send_fetches().await?;
466
467 self.log_fetcher.collect_fetches()
469 }
470
471 async fn poll_batches(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
472 let start = Instant::now();
473 let deadline = start + timeout;
474
475 loop {
476 let batches = self.poll_for_batches().await?;
477
478 if !batches.is_empty() {
479 self.log_fetcher.send_fetches().await?;
480 return Ok(batches);
481 }
482
483 let now = Instant::now();
484 if now >= deadline {
485 return Ok(Vec::new());
486 }
487
488 let remaining = deadline - now;
489 let has_data = self
490 .log_fetcher
491 .log_fetch_buffer
492 .await_not_empty(remaining)
493 .await?;
494
495 if !has_data {
496 return Ok(Vec::new());
497 }
498 }
499 }
500
501 async fn poll_for_batches(&self) -> Result<Vec<ScanBatch>> {
502 let result = self.log_fetcher.collect_batches()?;
503 if !result.is_empty() {
504 return Ok(result);
505 }
506
507 self.log_fetcher.send_fetches().await?;
508 self.log_fetcher.collect_batches()
509 }
510}
511
512impl LogScanner {
514 pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
515 self.inner.poll_records(timeout).await
516 }
517
518 pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
519 self.inner.subscribe(bucket, offset).await
520 }
521
522 pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
523 self.inner.subscribe_buckets(bucket_offsets).await
524 }
525
526 pub async fn subscribe_partition(
527 &self,
528 partition_id: PartitionId,
529 bucket: i32,
530 offset: i64,
531 ) -> Result<()> {
532 self.inner
533 .subscribe_partition(partition_id, bucket, offset)
534 .await
535 }
536
537 pub async fn subscribe_partition_buckets(
538 &self,
539 partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>,
540 ) -> Result<()> {
541 self.inner
542 .subscribe_partition_buckets(partition_bucket_offsets)
543 .await
544 }
545
546 pub async fn unsubscribe(&self, bucket: i32) -> Result<()> {
547 self.inner.unsubscribe(bucket).await
548 }
549
550 pub async fn unsubscribe_partition(
551 &self,
552 partition_id: PartitionId,
553 bucket: i32,
554 ) -> Result<()> {
555 self.inner.unsubscribe_partition(partition_id, bucket).await
556 }
557}
558
559impl RecordBatchLogScanner {
561 pub async fn poll(&self, timeout: Duration) -> Result<Vec<ScanBatch>> {
563 self.inner.poll_batches(timeout).await
564 }
565
566 pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
567 self.inner.subscribe(bucket, offset).await
568 }
569
570 pub async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
571 self.inner.subscribe_buckets(bucket_offsets).await
572 }
573
574 pub async fn subscribe_partition(
575 &self,
576 partition_id: PartitionId,
577 bucket: i32,
578 offset: i64,
579 ) -> Result<()> {
580 self.inner
581 .subscribe_partition(partition_id, bucket, offset)
582 .await
583 }
584
585 pub fn is_partitioned(&self) -> bool {
587 self.inner.is_partitioned_table
588 }
589
590 pub fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)> {
592 self.inner.log_scanner_status.get_all_subscriptions()
593 }
594
595 pub async fn subscribe_partition_buckets(
596 &self,
597 partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>,
598 ) -> Result<()> {
599 self.inner
600 .subscribe_partition_buckets(partition_bucket_offsets)
601 .await
602 }
603
604 pub async fn unsubscribe(&self, bucket: i32) -> Result<()> {
605 self.inner.unsubscribe(bucket).await
606 }
607
608 pub async fn unsubscribe_partition(
609 &self,
610 partition_id: PartitionId,
611 bucket: i32,
612 ) -> Result<()> {
613 self.inner.unsubscribe_partition(partition_id, bucket).await
614 }
615}
616
617struct LogFetcher {
618 conns: Arc<RpcClient>,
619 metadata: Arc<Metadata>,
620 table_path: TablePath,
621 is_partitioned: bool,
622 log_scanner_status: Arc<LogScannerStatus>,
623 read_context: ReadContext,
624 remote_read_context: ReadContext,
625 remote_log_downloader: Arc<RemoteLogDownloader>,
626 #[allow(dead_code)]
629 security_token_manager: Arc<SecurityTokenManager>,
630 log_fetch_buffer: Arc<LogFetchBuffer>,
631 nodes_with_pending_fetch_requests: Arc<Mutex<HashSet<i32>>>,
632 max_poll_records: usize,
633 fetch_max_bytes: i32,
634 fetch_min_bytes: i32,
635 fetch_wait_max_time_ms: i32,
636 fetch_max_bytes_for_bucket: i32,
637}
638
639struct FetchResponseContext {
640 metadata: Arc<Metadata>,
641 log_fetch_buffer: Arc<LogFetchBuffer>,
642 log_scanner_status: Arc<LogScannerStatus>,
643 read_context: ReadContext,
644 remote_read_context: ReadContext,
645 remote_log_downloader: Arc<RemoteLogDownloader>,
646}
647
648impl LogFetcher {
649 pub fn new(
650 table_info: TableInfo,
651 conns: Arc<RpcClient>,
652 metadata: Arc<Metadata>,
653 log_scanner_status: Arc<LogScannerStatus>,
654 config: &crate::config::Config,
655 projected_fields: Option<Vec<usize>>,
656 ) -> Result<Self> {
657 let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?;
658 let read_context =
659 Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false)?;
660 let remote_read_context =
661 Self::create_read_context(full_arrow_schema, projected_fields.clone(), true)?;
662
663 let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
664 let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone()));
665
666 let security_token_manager =
668 Arc::new(SecurityTokenManager::new(conns.clone(), metadata.clone()));
669
670 let credentials_rx = security_token_manager.subscribe();
672
673 let remote_log_downloader = Arc::new(RemoteLogDownloader::new(
674 tmp_dir,
675 config.scanner_remote_log_prefetch_num,
676 config.remote_file_download_thread_num,
677 config.scanner_remote_log_read_concurrency,
678 credentials_rx,
679 )?);
680
681 security_token_manager.start();
683
684 Ok(LogFetcher {
685 conns: conns.clone(),
686 metadata: metadata.clone(),
687 table_path: table_info.table_path.clone(),
688 is_partitioned: table_info.is_partitioned(),
689 log_scanner_status,
690 read_context,
691 remote_read_context,
692 remote_log_downloader,
693 security_token_manager,
694 log_fetch_buffer,
695 nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())),
696 max_poll_records: config.scanner_log_max_poll_records,
697 fetch_max_bytes: config.scanner_log_fetch_max_bytes,
698 fetch_min_bytes: config.scanner_log_fetch_min_bytes,
699 fetch_wait_max_time_ms: config.scanner_log_fetch_wait_max_time_ms,
700 fetch_max_bytes_for_bucket: config.scanner_log_fetch_max_bytes_for_bucket,
701 })
702 }
703
704 fn create_read_context(
705 full_arrow_schema: SchemaRef,
706 projected_fields: Option<Vec<usize>>,
707 is_from_remote: bool,
708 ) -> Result<ReadContext> {
709 match projected_fields {
710 None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)),
711 Some(fields) => {
712 ReadContext::with_projection_pushdown(full_arrow_schema, fields, is_from_remote)
713 }
714 }
715 }
716
717 fn describe_fetch_error(
718 error: FlussError,
719 table_bucket: &TableBucket,
720 fetch_offset: i64,
721 error_message: &str,
722 ) -> FetchErrorContext {
723 match error {
724 FlussError::NotLeaderOrFollower
725 | FlussError::LogStorageException
726 | FlussError::KvStorageException
727 | FlussError::StorageException
728 | FlussError::FencedLeaderEpochException
729 | FlussError::LeaderNotAvailableException => FetchErrorContext {
730 action: FetchErrorAction::Ignore,
731 log_level: FetchErrorLogLevel::Debug,
732 log_message: format!(
733 "Error in fetch for bucket {table_bucket}: {error:?}: {error_message}"
734 ),
735 },
736 FlussError::UnknownTableOrBucketException => FetchErrorContext {
737 action: FetchErrorAction::Ignore,
738 log_level: FetchErrorLogLevel::Warn,
739 log_message: format!(
740 "Received unknown table or bucket error in fetch for bucket {table_bucket}"
741 ),
742 },
743 FlussError::LogOffsetOutOfRangeException => FetchErrorContext {
744 action: FetchErrorAction::LogOffsetOutOfRange,
745 log_level: FetchErrorLogLevel::Debug,
746 log_message: format!(
747 "The fetching offset {fetch_offset} is out of range for bucket {table_bucket}: {error_message}"
748 ),
749 },
750 FlussError::AuthorizationException => FetchErrorContext {
751 action: FetchErrorAction::Authorization,
752 log_level: FetchErrorLogLevel::Debug,
753 log_message: format!(
754 "Authorization error while fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}"
755 ),
756 },
757 FlussError::UnknownServerError => FetchErrorContext {
758 action: FetchErrorAction::Ignore,
759 log_level: FetchErrorLogLevel::Warn,
760 log_message: format!(
761 "Unknown server error while fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}"
762 ),
763 },
764 FlussError::CorruptMessage => FetchErrorContext {
765 action: FetchErrorAction::CorruptMessage,
766 log_level: FetchErrorLogLevel::Debug,
767 log_message: format!(
768 "Encountered corrupt message when fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}"
769 ),
770 },
771 _ => FetchErrorContext {
772 action: FetchErrorAction::Unexpected,
773 log_level: FetchErrorLogLevel::Debug,
774 log_message: format!(
775 "Unexpected error code {error:?} while fetching at offset {fetch_offset} from bucket {table_bucket}: {error_message}"
776 ),
777 },
778 }
779 }
780
781 fn should_invalidate_table_meta(error: FlussError) -> bool {
782 matches!(
783 error,
784 FlussError::NotLeaderOrFollower
785 | FlussError::LeaderNotAvailableException
786 | FlussError::FencedLeaderEpochException
787 | FlussError::UnknownTableOrBucketException
788 | FlussError::InvalidCoordinatorException
789 )
790 }
791
792 async fn check_and_update_metadata(&self, table_buckets: &[TableBucket]) -> Result<()> {
793 let mut partition_ids = Vec::new();
794 let mut need_update = false;
795
796 for tb in table_buckets {
797 if self.get_table_bucket_leader(tb).is_some() {
798 continue;
799 }
800
801 if self.is_partitioned {
802 partition_ids.push(tb.partition_id().unwrap());
803 } else {
804 need_update = true;
805 break;
806 }
807 }
808
809 let update_result = if self.is_partitioned && !partition_ids.is_empty() {
810 self.metadata
811 .update_tables_metadata(
812 &HashSet::from([&self.table_path]),
813 &HashSet::new(),
814 partition_ids,
815 )
816 .await
817 } else if need_update {
818 self.metadata.update_table_metadata(&self.table_path).await
819 } else {
820 Ok(())
821 };
822
823 update_result.or_else(|e| {
825 if let Error::RpcError { source, .. } = &e
826 && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_))
827 {
828 warn!("Retrying after encountering error while updating table metadata: {e}");
829 Ok(())
830 } else {
831 Err(e)
832 }
833 })?;
834 Ok(())
835 }
836
837 async fn send_fetches(&self) -> Result<()> {
839 self.check_and_update_metadata(self.fetchable_buckets().as_slice())
840 .await?;
841 let fetch_request = self.prepare_fetch_log_requests().await;
842
843 for (leader, fetch_request) in fetch_request {
844 debug!("Adding pending request for node id {leader}");
845 {
847 self.nodes_with_pending_fetch_requests.lock().insert(leader);
848 }
849
850 let cluster = self.metadata.get_cluster().clone();
851
852 let conns = Arc::clone(&self.conns);
853 let log_fetch_buffer = self.log_fetch_buffer.clone();
854 let log_scanner_status = self.log_scanner_status.clone();
855 let read_context = self.read_context.clone();
856 let remote_read_context = self.remote_read_context.clone();
857 let remote_log_downloader = Arc::clone(&self.remote_log_downloader);
858 let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone();
859 let metadata = self.metadata.clone();
860 let response_context = FetchResponseContext {
861 metadata: metadata.clone(),
862 log_fetch_buffer,
863 log_scanner_status,
864 read_context,
865 remote_read_context,
866 remote_log_downloader,
867 };
868 tokio::spawn(async move {
876 let _guard = scopeguard::guard((), |_| {
878 nodes_with_pending.lock().remove(&leader);
879 });
880
881 let server_node = match cluster.get_tablet_server(leader) {
882 Some(node) => node,
883 None => {
884 warn!("No server node found for leader {leader}, retrying");
885 Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
886 return;
887 }
888 };
889
890 let con = match conns.get_connection(server_node).await {
891 Ok(con) => con,
892 Err(e) => {
893 warn!("Retrying after error getting connection to destination node: {e:?}");
894 Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
895 return;
896 }
897 };
898
899 let fetch_response = match con
900 .request(message::FetchLogRequest::new(fetch_request.clone()))
901 .await
902 {
903 Ok(resp) => resp,
904 Err(e) => {
905 warn!(
906 "Retrying after error fetching log from destination node {server_node:?}: {e:?}"
907 );
908 Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
909 return;
910 }
911 };
912
913 Self::handle_fetch_response(fetch_response, response_context).await;
914 });
915 }
916
917 Ok(())
918 }
919
920 async fn handle_fetch_failure(
921 metadata: Arc<Metadata>,
922 server_id: &i32,
923 request: &FetchLogRequest,
924 ) {
925 let table_ids = request.tables_req.iter().map(|r| r.table_id).collect();
926 metadata.invalidate_server(server_id, table_ids);
927 }
928
929 async fn handle_fetch_response(
931 fetch_response: crate::proto::FetchLogResponse,
932 context: FetchResponseContext,
933 ) {
934 let FetchResponseContext {
935 metadata,
936 log_fetch_buffer,
937 log_scanner_status,
938 read_context,
939 remote_read_context,
940 remote_log_downloader,
941 } = context;
942
943 for pb_fetch_log_resp in fetch_response.tables_resp {
944 let table_id = pb_fetch_log_resp.table_id;
945 let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
946
947 for fetch_log_for_bucket in fetch_log_for_buckets {
948 let bucket: i32 = fetch_log_for_bucket.bucket_id;
949 let table_bucket = TableBucket::new_with_partition(
950 table_id,
951 fetch_log_for_bucket.partition_id,
952 bucket,
953 );
954
955 let Some(fetch_offset) = log_scanner_status.get_bucket_offset(&table_bucket) else {
957 debug!(
958 "Ignoring fetch log response for bucket {table_bucket} because the bucket has been unsubscribed."
959 );
960 continue;
961 };
962
963 if let Some(error_code) = fetch_log_for_bucket.error_code
964 && error_code != FlussError::None.code()
965 {
966 let api_error: ApiError = ErrorResponse {
967 error_code,
968 error_message: fetch_log_for_bucket.error_message.clone(),
969 }
970 .into();
971
972 let error = FlussError::for_code(error_code);
973 if Self::should_invalidate_table_meta(error) {
974 let table_id = table_bucket.table_id();
976 let cluster = metadata.get_cluster();
977 if let Some(table_path) = cluster.get_table_path_by_id(table_id) {
978 let physical_tables = HashSet::from([PhysicalTablePath::of(Arc::new(
979 table_path.clone(),
980 ))]);
981 metadata.invalidate_physical_table_meta(&physical_tables);
982 } else {
983 warn!(
984 "Table id {table_id} is missing from table_path_by_id while invalidating table metadata"
985 );
986 }
987 }
988 let error_context = Self::describe_fetch_error(
989 error,
990 &table_bucket,
991 fetch_offset,
992 api_error.message.as_str(),
993 );
994 log_scanner_status.move_bucket_to_end(table_bucket.clone());
995 match error_context.log_level {
996 FetchErrorLogLevel::Debug => {
997 debug!("{}", error_context.log_message);
998 }
999 FetchErrorLogLevel::Warn => {
1000 warn!("{}", error_context.log_message);
1001 }
1002 }
1003 log_fetch_buffer.add_api_error(
1004 table_bucket.clone(),
1005 api_error,
1006 error_context,
1007 fetch_offset,
1008 );
1009 continue;
1010 }
1011
1012 if let Some(ref remote_log_fetch_info) = fetch_log_for_bucket.remote_log_fetch_info
1014 {
1015 let remote_fetch_info =
1017 RemoteLogFetchInfo::from_proto(remote_log_fetch_info, table_bucket.clone());
1018
1019 let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1);
1020 Self::pending_remote_fetches(
1021 remote_log_downloader.clone(),
1022 log_fetch_buffer.clone(),
1023 remote_read_context.clone(),
1024 &table_bucket,
1025 remote_fetch_info,
1026 fetch_offset,
1027 high_watermark,
1028 );
1029 } else if fetch_log_for_bucket.records.is_some() {
1030 let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1);
1032 let records = fetch_log_for_bucket.records.unwrap_or(vec![]);
1033 let size_in_bytes = records.len();
1034 let log_record_batch = LogRecordsBatches::new(records);
1035
1036 let completed_fetch = DefaultCompletedFetch::new(
1037 table_bucket.clone(),
1038 log_record_batch,
1039 size_in_bytes,
1040 read_context.clone(),
1041 fetch_offset,
1042 high_watermark,
1043 );
1044 log_fetch_buffer.add(Box::new(completed_fetch));
1045 }
1046 }
1047 }
1048 }
1049
1050 fn pending_remote_fetches(
1051 remote_log_downloader: Arc<RemoteLogDownloader>,
1052 log_fetch_buffer: Arc<LogFetchBuffer>,
1053 read_context: ReadContext,
1054 table_bucket: &TableBucket,
1055 remote_fetch_info: RemoteLogFetchInfo,
1056 fetch_offset: i64,
1057 high_watermark: i64,
1058 ) {
1059 let mut pos_in_log_segment = remote_fetch_info.first_start_pos;
1061 let mut current_fetch_offset = fetch_offset;
1062 for (i, segment) in remote_fetch_info.remote_log_segments.iter().enumerate() {
1063 if i > 0 {
1064 pos_in_log_segment = 0;
1065 current_fetch_offset = segment.start_offset;
1066 }
1067
1068 let download_future = remote_log_downloader
1072 .request_remote_log(&remote_fetch_info.remote_log_tablet_dir, segment);
1073
1074 let table_bucket = table_bucket.clone();
1078 let log_fetch_buffer_clone = log_fetch_buffer.clone();
1079 download_future.on_complete(move || {
1080 log_fetch_buffer_clone.try_complete(&table_bucket);
1081 });
1082
1083 let pending_fetch = RemotePendingFetch::new(
1084 segment.clone(),
1085 download_future,
1086 pos_in_log_segment,
1087 current_fetch_offset,
1088 high_watermark,
1089 read_context.clone(),
1090 );
1091 log_fetch_buffer.pend(Box::new(pending_fetch));
1093 }
1094 }
1095
1096 fn collect_fetches(&self) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> {
1099 let mut result: HashMap<TableBucket, Vec<ScanRecord>> = HashMap::new();
1100 let mut records_remaining = self.max_poll_records;
1101
1102 let collect_result: Result<()> = {
1103 while records_remaining > 0 {
1104 let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
1106
1107 if next_in_line.is_none() || next_in_line.as_ref().unwrap().is_consumed() {
1108 if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
1110 if !completed_fetch.is_initialized() {
1112 let size_in_bytes = completed_fetch.size_in_bytes();
1113 match self.initialize_fetch(completed_fetch) {
1114 Ok(initialized) => {
1115 self.log_fetch_buffer.set_next_in_line_fetch(initialized);
1116 continue;
1117 }
1118 Err(e) => {
1119 if result.is_empty() && size_in_bytes == 0 {
1124 }
1127 return Err(e);
1128 }
1129 }
1130 } else {
1131 self.log_fetch_buffer
1132 .set_next_in_line_fetch(Some(completed_fetch));
1133 }
1134 } else {
1136 break;
1138 }
1139 } else {
1140 if let Some(mut next_fetch) = next_in_line {
1142 let records = match self
1143 .fetch_records_from_fetch(&mut next_fetch, records_remaining)
1144 {
1145 Ok(records) => records,
1146 Err(e) => {
1147 if !next_fetch.is_consumed() {
1148 self.log_fetch_buffer
1149 .set_next_in_line_fetch(Some(next_fetch));
1150 }
1151 return Err(e);
1152 }
1153 };
1154
1155 if !records.is_empty() {
1156 let table_bucket = next_fetch.table_bucket().clone();
1157 let existing = result.entry(table_bucket).or_default();
1159 let records_count = records.len();
1160 existing.extend(records);
1161
1162 records_remaining = records_remaining.saturating_sub(records_count);
1163 }
1164
1165 if !next_fetch.is_consumed() {
1167 self.log_fetch_buffer
1168 .set_next_in_line_fetch(Some(next_fetch));
1169 }
1170 }
1172 }
1173 }
1174 Ok(())
1175 };
1176
1177 match collect_result {
1178 Ok(()) => Ok(result),
1179 Err(e) => {
1180 if result.is_empty() {
1181 Err(e)
1182 } else {
1183 Ok(result)
1184 }
1185 }
1186 }
1187 }
1188
1189 fn initialize_fetch(
1191 &self,
1192 mut completed_fetch: Box<dyn CompletedFetch>,
1193 ) -> Result<Option<Box<dyn CompletedFetch>>> {
1194 if let Some(error) = completed_fetch.take_error() {
1195 return Err(error);
1196 }
1197
1198 let table_bucket = completed_fetch.table_bucket().clone();
1199 let fetch_offset = completed_fetch.next_fetch_offset();
1200
1201 if let Some(api_error) = completed_fetch.api_error() {
1202 let error = FlussError::for_code(api_error.code);
1203 let error_message = api_error.message.as_str();
1204 self.log_scanner_status
1205 .move_bucket_to_end(table_bucket.clone());
1206 let action = completed_fetch
1207 .fetch_error_context()
1208 .map(|context| context.action)
1209 .unwrap_or(FetchErrorAction::Unexpected);
1210 match action {
1211 FetchErrorAction::Ignore => {
1212 return Ok(None);
1213 }
1214 FetchErrorAction::LogOffsetOutOfRange => {
1215 return Err(Error::UnexpectedError {
1216 message: format!(
1217 "The fetching offset {fetch_offset} is out of range: {error_message}"
1218 ),
1219 source: None,
1220 });
1221 }
1222 FetchErrorAction::Authorization => {
1223 return Err(Error::FlussAPIError {
1224 api_error: ApiError {
1225 code: api_error.code,
1226 message: api_error.message.to_string(),
1227 },
1228 });
1229 }
1230 FetchErrorAction::CorruptMessage => {
1231 return Err(Error::UnexpectedError {
1232 message: format!(
1233 "Encountered corrupt message when fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}"
1234 ),
1235 source: None,
1236 });
1237 }
1238 FetchErrorAction::Unexpected => {
1239 return Err(Error::UnexpectedError {
1240 message: format!(
1241 "Unexpected error code {error:?} while fetching at offset {fetch_offset} from bucket {table_bucket}: {error_message}"
1242 ),
1243 source: None,
1244 });
1245 }
1246 }
1247 }
1248
1249 let Some(current_offset) = self.log_scanner_status.get_bucket_offset(&table_bucket) else {
1251 warn!(
1252 "Discarding stale fetch response for bucket {table_bucket:?} since the bucket has been unsubscribed"
1253 );
1254 return Ok(None);
1255 };
1256
1257 if fetch_offset != current_offset {
1259 warn!(
1260 "Discarding stale fetch response for bucket {table_bucket:?} since its offset {fetch_offset} does not match the expected offset {current_offset}"
1261 );
1262 return Ok(None);
1263 }
1264
1265 let high_watermark = completed_fetch.high_watermark();
1267 if high_watermark >= 0 {
1268 self.log_scanner_status
1269 .update_high_watermark(&table_bucket, high_watermark);
1270 }
1271
1272 completed_fetch.set_initialized();
1273 Ok(Some(completed_fetch))
1274 }
1275
1276 fn fetch_records_from_fetch(
1278 &self,
1279 next_in_line_fetch: &mut Box<dyn CompletedFetch>,
1280 max_records: usize,
1281 ) -> Result<Vec<ScanRecord>> {
1282 let table_bucket = next_in_line_fetch.table_bucket().clone();
1283 let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket);
1284
1285 if current_offset.is_none() {
1286 warn!(
1287 "Ignoring fetched records for {table_bucket:?} since the bucket has been unsubscribed"
1288 );
1289 next_in_line_fetch.drain();
1290 return Ok(Vec::new());
1291 }
1292
1293 let current_offset = current_offset.unwrap();
1294 let fetch_offset = next_in_line_fetch.next_fetch_offset();
1295
1296 if fetch_offset == current_offset {
1298 let records = next_in_line_fetch.fetch_records(max_records)?;
1299 let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
1300
1301 if next_fetch_offset > current_offset {
1302 self.log_scanner_status
1303 .update_offset(&table_bucket, next_fetch_offset);
1304 }
1305
1306 if next_in_line_fetch.is_consumed() && next_in_line_fetch.records_read() > 0 {
1307 self.log_scanner_status
1308 .move_bucket_to_end(table_bucket.clone());
1309 }
1310
1311 Ok(records)
1312 } else {
1313 warn!(
1315 "Ignoring fetched records for {table_bucket:?} at offset {fetch_offset} since the current offset is {current_offset}"
1316 );
1317 next_in_line_fetch.drain();
1318 Ok(Vec::new())
1319 }
1320 }
1321
1322 fn collect_batches(&self) -> Result<Vec<ScanBatch>> {
1324 const MAX_BATCHES: usize = 100;
1327 const MAX_BYTES: usize = 64 * 1024 * 1024; let mut result: Vec<ScanBatch> = Vec::new();
1329 let mut batches_remaining = MAX_BATCHES;
1330 let mut bytes_consumed: usize = 0;
1331
1332 let collect_result: Result<()> = {
1333 while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
1334 let next_in_line = self.log_fetch_buffer.next_in_line_fetch();
1335
1336 match next_in_line {
1337 Some(mut next_fetch) if !next_fetch.is_consumed() => {
1338 let scan_batches =
1339 self.fetch_batches_from_fetch(&mut next_fetch, batches_remaining)?;
1340 let batch_count = scan_batches.len();
1341
1342 if !scan_batches.is_empty() {
1343 let batch_bytes: usize = scan_batches
1345 .iter()
1346 .map(|sb| sb.batch().get_array_memory_size())
1347 .sum();
1348 bytes_consumed += batch_bytes;
1349
1350 result.extend(scan_batches);
1351 batches_remaining = batches_remaining.saturating_sub(batch_count);
1352 }
1353
1354 if !next_fetch.is_consumed() {
1355 self.log_fetch_buffer
1356 .set_next_in_line_fetch(Some(next_fetch));
1357 }
1358 }
1359 _ => {
1360 if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
1361 if !completed_fetch.is_initialized() {
1362 let size_in_bytes = completed_fetch.size_in_bytes();
1363 match self.initialize_fetch(completed_fetch) {
1364 Ok(initialized) => {
1365 self.log_fetch_buffer.set_next_in_line_fetch(initialized);
1366 continue;
1367 }
1368 Err(e) => {
1369 if result.is_empty() && size_in_bytes == 0 {
1370 continue;
1371 }
1372 return Err(e);
1373 }
1374 }
1375 } else {
1376 self.log_fetch_buffer
1377 .set_next_in_line_fetch(Some(completed_fetch));
1378 }
1379 } else {
1380 break;
1381 }
1382 }
1383 }
1384 }
1385 Ok(())
1386 };
1387
1388 match collect_result {
1389 Ok(()) => Ok(result),
1390 Err(e) => {
1391 if result.is_empty() {
1392 Err(e)
1393 } else {
1394 Ok(result)
1395 }
1396 }
1397 }
1398 }
1399
1400 fn fetch_batches_from_fetch(
1401 &self,
1402 next_in_line_fetch: &mut Box<dyn CompletedFetch>,
1403 max_batches: usize,
1404 ) -> Result<Vec<ScanBatch>> {
1405 let table_bucket = next_in_line_fetch.table_bucket().clone();
1406 let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket);
1407
1408 if current_offset.is_none() {
1409 warn!(
1410 "Ignoring fetched batches for {table_bucket:?} since the bucket has been unsubscribed"
1411 );
1412 next_in_line_fetch.drain();
1413 return Ok(Vec::new());
1414 }
1415
1416 let current_offset = current_offset.unwrap();
1417 let fetch_offset = next_in_line_fetch.next_fetch_offset();
1418
1419 if fetch_offset == current_offset {
1420 let batches_with_offsets = next_in_line_fetch.fetch_batches(max_batches)?;
1421 let next_fetch_offset = next_in_line_fetch.next_fetch_offset();
1422
1423 if next_fetch_offset > current_offset {
1424 self.log_scanner_status
1425 .update_offset(&table_bucket, next_fetch_offset);
1426 }
1427
1428 Ok(batches_with_offsets
1430 .into_iter()
1431 .map(|(batch, base_offset)| {
1432 ScanBatch::new(table_bucket.clone(), batch, base_offset)
1433 })
1434 .collect())
1435 } else {
1436 warn!(
1437 "Ignoring fetched batches for {table_bucket:?} at offset {fetch_offset} since the current offset is {current_offset}"
1438 );
1439 next_in_line_fetch.drain();
1440 Ok(Vec::new())
1441 }
1442 }
1443
1444 async fn prepare_fetch_log_requests(&self) -> HashMap<i32, FetchLogRequest> {
1445 let mut fetch_log_req_for_buckets = HashMap::new();
1446 let mut table_id = None;
1447 let mut ready_for_fetch_count = 0;
1448 for bucket in self.fetchable_buckets() {
1449 if table_id.is_none() {
1450 table_id = Some(bucket.table_id());
1451 }
1452
1453 let offset = match self.log_scanner_status.get_bucket_offset(&bucket) {
1454 Some(offset) => offset,
1455 None => {
1456 debug!(
1457 "Skipping fetch request for bucket {bucket} because the bucket has been unsubscribed."
1458 );
1459 continue;
1460 }
1461 };
1462
1463 match self.get_table_bucket_leader(&bucket) {
1464 None => {
1465 log::trace!(
1466 "Skipping fetch request for bucket {bucket} because leader is not available."
1467 )
1468 }
1469 Some(leader) => {
1470 if self
1471 .nodes_with_pending_fetch_requests
1472 .lock()
1473 .contains(&leader)
1474 {
1475 log::trace!(
1476 "Skipping fetch request for bucket {bucket} because previous request to server {leader} has not been processed."
1477 )
1478 } else {
1479 let fetch_log_req_for_bucket = PbFetchLogReqForBucket {
1480 partition_id: bucket.partition_id(),
1481 bucket_id: bucket.bucket_id(),
1482 fetch_offset: offset,
1483 max_fetch_bytes: self.fetch_max_bytes_for_bucket,
1484 };
1485
1486 fetch_log_req_for_buckets
1487 .entry(leader)
1488 .or_insert_with(Vec::new)
1489 .push(fetch_log_req_for_bucket);
1490 ready_for_fetch_count += 1;
1491 }
1492 }
1493 }
1494 }
1495
1496 if ready_for_fetch_count == 0 {
1497 HashMap::new()
1498 } else {
1499 let (projection_enabled, projected_fields) =
1500 match self.read_context.project_fields_in_order() {
1501 None => (false, vec![]),
1502 Some(fields) => (true, fields.iter().map(|&i| i as i32).collect()),
1503 };
1504
1505 fetch_log_req_for_buckets
1506 .into_iter()
1507 .map(|(leader_id, feq_for_buckets)| {
1508 let req_for_table = PbFetchLogReqForTable {
1509 table_id: table_id.unwrap(),
1510 projection_pushdown_enabled: projection_enabled,
1511 projected_fields: projected_fields.clone(),
1512 buckets_req: feq_for_buckets,
1513 };
1514
1515 let fetch_log_request = FetchLogRequest {
1516 follower_server_id: -1,
1517 max_bytes: self.fetch_max_bytes,
1518 tables_req: vec![req_for_table],
1519 max_wait_ms: Some(self.fetch_wait_max_time_ms),
1520 min_bytes: Some(self.fetch_min_bytes),
1521 };
1522 (leader_id, fetch_log_request)
1523 })
1524 .collect()
1525 }
1526 }
1527
1528 fn fetchable_buckets(&self) -> Vec<TableBucket> {
1529 let buffered = self.log_fetch_buffer.buffered_buckets();
1531 let buffered_set: HashSet<TableBucket> = buffered.into_iter().collect();
1532 self.log_scanner_status
1533 .fetchable_buckets(|tb| !buffered_set.contains(tb))
1534 }
1535
1536 fn get_table_bucket_leader(&self, tb: &TableBucket) -> Option<i32> {
1537 let cluster = self.metadata.get_cluster();
1538 cluster.leader_for(tb).map(|leader| leader.id())
1539 }
1540}
1541
1542pub struct LogScannerStatus {
1543 bucket_status_map: Arc<RwLock<FairBucketStatusMap<BucketScanStatus>>>,
1544}
1545
1546#[allow(dead_code)]
1547impl LogScannerStatus {
1548 pub fn new() -> Self {
1549 Self {
1550 bucket_status_map: Arc::new(RwLock::new(FairBucketStatusMap::new())),
1551 }
1552 }
1553
1554 pub fn prepare_to_poll(&self) -> bool {
1555 let map = self.bucket_status_map.read();
1556 map.size() > 0
1557 }
1558
1559 pub fn move_bucket_to_end(&self, table_bucket: TableBucket) {
1560 let mut map = self.bucket_status_map.write();
1561 map.move_to_end(table_bucket);
1562 }
1563
1564 pub fn get_bucket_offset(&self, table_bucket: &TableBucket) -> Option<i64> {
1566 let map = self.bucket_status_map.read();
1567 map.status_value(table_bucket).map(|status| status.offset())
1568 }
1569
1570 pub fn update_high_watermark(&self, table_bucket: &TableBucket, high_watermark: i64) {
1571 if let Some(status) = self.get_status(table_bucket) {
1572 status.set_high_watermark(high_watermark);
1573 }
1574 }
1575
1576 pub fn update_offset(&self, table_bucket: &TableBucket, offset: i64) {
1577 if let Some(status) = self.get_status(table_bucket) {
1578 status.set_offset(offset);
1579 }
1580 }
1581
1582 pub fn assign_scan_buckets(&self, scan_bucket_offsets: HashMap<TableBucket, i64>) {
1583 let mut map = self.bucket_status_map.write();
1584 for (bucket, offset) in scan_bucket_offsets {
1585 let status = map
1586 .status_value(&bucket)
1587 .cloned()
1588 .unwrap_or_else(|| Arc::new(BucketScanStatus::new(offset)));
1589 status.set_offset(offset);
1590 map.update(bucket, status);
1591 }
1592 }
1593
1594 pub fn assign_scan_bucket(&self, table_bucket: TableBucket, offset: i64) {
1595 let status = Arc::new(BucketScanStatus::new(offset));
1596 self.bucket_status_map.write().update(table_bucket, status);
1597 }
1598
1599 pub fn unassign_scan_buckets(&self, buckets: &[TableBucket]) {
1601 let mut map = self.bucket_status_map.write();
1602 for bucket in buckets {
1603 map.remove(bucket);
1604 }
1605 }
1606
1607 pub fn fetchable_buckets<F>(&self, is_available: F) -> Vec<TableBucket>
1609 where
1610 F: Fn(&TableBucket) -> bool,
1611 {
1612 let map = self.bucket_status_map.read();
1613 let mut result = Vec::new();
1614 map.for_each(|bucket, _| {
1615 if is_available(bucket) {
1616 result.push(bucket.clone());
1617 }
1618 });
1619 result
1620 }
1621
1622 pub fn get_all_subscriptions(&self) -> Vec<(TableBucket, i64)> {
1624 let map = self.bucket_status_map.read();
1625 let mut result = Vec::new();
1626 map.for_each(|bucket, status| {
1627 result.push((bucket.clone(), status.offset()));
1628 });
1629 result
1630 }
1631
1632 fn get_status(&self, table_bucket: &TableBucket) -> Option<Arc<BucketScanStatus>> {
1634 let map = self.bucket_status_map.read();
1635 map.status_value(table_bucket).cloned()
1636 }
1637}
1638
1639impl Default for LogScannerStatus {
1640 fn default() -> Self {
1641 Self::new()
1642 }
1643}
1644
1645#[derive(Debug)]
1646#[allow(dead_code)]
1647pub struct BucketScanStatus {
1648 offset: RwLock<i64>,
1649 high_watermark: RwLock<i64>,
1650}
1651
1652#[allow(dead_code)]
1653impl BucketScanStatus {
1654 pub fn new(offset: i64) -> Self {
1655 Self {
1656 offset: RwLock::new(offset),
1657 high_watermark: RwLock::new(0),
1658 }
1659 }
1660
1661 pub fn offset(&self) -> i64 {
1662 *self.offset.read()
1663 }
1664
1665 pub fn set_offset(&self, offset: i64) {
1666 *self.offset.write() = offset
1667 }
1668
1669 pub fn high_watermark(&self) -> i64 {
1670 *self.high_watermark.read()
1671 }
1672
1673 pub fn set_high_watermark(&self, high_watermark: i64) {
1674 *self.high_watermark.write() = high_watermark
1675 }
1676}
1677
1678fn validate_scan_support(table_path: &TablePath, table_info: &TableInfo) -> Result<()> {
1679 if table_info.schema.primary_key().is_some() {
1680 return Err(UnsupportedOperation {
1681 message: format!("Table {table_path} is not a Log Table and doesn't support scan."),
1682 });
1683 }
1684
1685 let log_format = table_info.table_config.get_log_format()?;
1686 if LogFormat::ARROW != log_format {
1687 return Err(UnsupportedOperation {
1688 message: format!(
1689 "Scan is only supported for ARROW format and table {table_path} uses {log_format} format"
1690 ),
1691 });
1692 }
1693
1694 Ok(())
1695}
1696
1697#[cfg(test)]
1698mod tests {
1699 use super::*;
1700 use crate::client::WriteRecord;
1701 use crate::client::metadata::Metadata;
1702 use crate::compression::{
1703 ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1704 };
1705 use crate::metadata::{DataTypes, PhysicalTablePath, Schema, TableInfo, TablePath};
1706 use crate::record::MemoryLogRecordsArrowBuilder;
1707 use crate::row::{Datum, GenericRow};
1708 use crate::rpc::FlussError;
1709 use crate::test_utils::{build_cluster_arc, build_table_info};
1710
1711 fn build_records(table_info: &TableInfo, table_path: Arc<TablePath>) -> Result<Vec<u8>> {
1712 let mut builder = MemoryLogRecordsArrowBuilder::new(
1713 1,
1714 table_info.get_row_type(),
1715 false,
1716 ArrowCompressionInfo {
1717 compression_type: ArrowCompressionType::None,
1718 compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1719 },
1720 )?;
1721 let physical_table_path = Arc::new(PhysicalTablePath::of(table_path));
1722 let row = GenericRow {
1723 values: vec![Datum::Int32(1)],
1724 };
1725 let record =
1726 WriteRecord::for_append(Arc::new(table_info.clone()), physical_table_path, 1, &row);
1727 builder.append(&record)?;
1728 builder.build()
1729 }
1730
1731 #[tokio::test]
1732 async fn collect_fetches_updates_offset() -> Result<()> {
1733 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1734 let table_info = build_table_info(table_path.clone(), 1, 1);
1735 let cluster = build_cluster_arc(&table_path, 1, 1);
1736 let metadata = Arc::new(Metadata::new_for_test(cluster));
1737 let status = Arc::new(LogScannerStatus::new());
1738 let fetcher = LogFetcher::new(
1739 table_info.clone(),
1740 Arc::new(RpcClient::new()),
1741 metadata,
1742 status.clone(),
1743 &crate::config::Config::default(),
1744 None,
1745 )?;
1746
1747 let bucket = TableBucket::new(1, 0);
1748 status.assign_scan_bucket(bucket.clone(), 0);
1749
1750 let data = build_records(&table_info, Arc::new(table_path))?;
1751 let log_records = LogRecordsBatches::new(data.clone());
1752 let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
1753 let completed =
1754 DefaultCompletedFetch::new(bucket.clone(), log_records, data.len(), read_context, 0, 0);
1755 fetcher.log_fetch_buffer.add(Box::new(completed));
1756
1757 let fetched = fetcher.collect_fetches()?;
1758 assert_eq!(fetched.get(&bucket).unwrap().len(), 1);
1759 assert_eq!(status.get_bucket_offset(&bucket), Some(1));
1760 Ok(())
1761 }
1762
1763 #[tokio::test]
1764 async fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> {
1765 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1766 let table_info = build_table_info(table_path.clone(), 1, 1);
1767 let cluster = build_cluster_arc(&table_path, 1, 1);
1768 let metadata = Arc::new(Metadata::new_for_test(cluster));
1769 let status = Arc::new(LogScannerStatus::new());
1770 let fetcher = LogFetcher::new(
1771 table_info.clone(),
1772 Arc::new(RpcClient::new()),
1773 metadata,
1774 status,
1775 &crate::config::Config::default(),
1776 None,
1777 )?;
1778
1779 let bucket = TableBucket::new(1, 0);
1780 let data = build_records(&table_info, Arc::new(table_path))?;
1781 let log_records = LogRecordsBatches::new(data.clone());
1782 let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
1783 let mut completed: Box<dyn CompletedFetch> = Box::new(DefaultCompletedFetch::new(
1784 bucket,
1785 log_records,
1786 data.len(),
1787 read_context,
1788 0,
1789 0,
1790 ));
1791
1792 let records = fetcher.fetch_records_from_fetch(&mut completed, 10)?;
1793 assert!(records.is_empty());
1794 assert!(completed.is_consumed());
1795 Ok(())
1796 }
1797
1798 #[tokio::test]
1799 async fn prepare_fetch_log_requests_skips_pending() -> Result<()> {
1800 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1801 let table_info = build_table_info(table_path.clone(), 1, 1);
1802 let cluster = build_cluster_arc(&table_path, 1, 1);
1803 let metadata = Arc::new(Metadata::new_for_test(cluster));
1804 let status = Arc::new(LogScannerStatus::new());
1805 status.assign_scan_bucket(TableBucket::new(1, 0), 0);
1806 let fetcher = LogFetcher::new(
1807 table_info,
1808 Arc::new(RpcClient::new()),
1809 metadata,
1810 status,
1811 &crate::config::Config::default(),
1812 None,
1813 )?;
1814
1815 fetcher.nodes_with_pending_fetch_requests.lock().insert(1);
1816
1817 let requests = fetcher.prepare_fetch_log_requests().await;
1818 assert!(requests.is_empty());
1819 Ok(())
1820 }
1821
1822 #[tokio::test]
1823 async fn handle_fetch_response_sets_error() -> Result<()> {
1824 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1825 let table_info = build_table_info(table_path.clone(), 1, 1);
1826 let cluster = build_cluster_arc(&table_path, 1, 1);
1827 let metadata = Arc::new(Metadata::new_for_test(cluster));
1828 let status = Arc::new(LogScannerStatus::new());
1829 status.assign_scan_bucket(TableBucket::new(1, 0), 5);
1830 let fetcher = LogFetcher::new(
1831 table_info.clone(),
1832 Arc::new(RpcClient::new()),
1833 metadata.clone(),
1834 status.clone(),
1835 &crate::config::Config::default(),
1836 None,
1837 )?;
1838
1839 let response = crate::proto::FetchLogResponse {
1840 tables_resp: vec![crate::proto::PbFetchLogRespForTable {
1841 table_id: 1,
1842 buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
1843 partition_id: None,
1844 bucket_id: 0,
1845 error_code: Some(FlussError::AuthorizationException.code()),
1846 error_message: Some("denied".to_string()),
1847 high_watermark: None,
1848 log_start_offset: None,
1849 remote_log_fetch_info: None,
1850 records: None,
1851 }],
1852 }],
1853 };
1854
1855 let response_context = FetchResponseContext {
1856 metadata: metadata.clone(),
1857 log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
1858 log_scanner_status: fetcher.log_scanner_status.clone(),
1859 read_context: fetcher.read_context.clone(),
1860 remote_read_context: fetcher.remote_read_context.clone(),
1861 remote_log_downloader: fetcher.remote_log_downloader.clone(),
1862 };
1863
1864 LogFetcher::handle_fetch_response(response, response_context).await;
1865
1866 let completed = fetcher.log_fetch_buffer.poll().expect("completed fetch");
1867 let api_error = completed.api_error().expect("api error");
1868 assert_eq!(api_error.code, FlussError::AuthorizationException.code());
1869 Ok(())
1870 }
1871
1872 #[tokio::test]
1873 async fn handle_fetch_response_invalidates_table_meta() -> Result<()> {
1874 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1875 let table_info = build_table_info(table_path.clone(), 1, 1);
1876 let cluster = build_cluster_arc(&table_path, 1, 1);
1877 let metadata = Arc::new(Metadata::new_for_test(cluster.clone()));
1878 let status = Arc::new(LogScannerStatus::new());
1879 status.assign_scan_bucket(TableBucket::new(1, 0), 5);
1880 let fetcher = LogFetcher::new(
1881 table_info.clone(),
1882 Arc::new(RpcClient::new()),
1883 metadata.clone(),
1884 status.clone(),
1885 &crate::config::Config::default(),
1886 None,
1887 )?;
1888
1889 let bucket = TableBucket::new(1, 0);
1890 assert!(metadata.leader_for(&table_path, &bucket).await?.is_some());
1891
1892 let response = crate::proto::FetchLogResponse {
1893 tables_resp: vec![crate::proto::PbFetchLogRespForTable {
1894 table_id: 1,
1895 buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
1896 partition_id: None,
1897 bucket_id: 0,
1898 error_code: Some(FlussError::NotLeaderOrFollower.code()),
1899 error_message: Some("not leader".to_string()),
1900 high_watermark: None,
1901 log_start_offset: None,
1902 remote_log_fetch_info: None,
1903 records: None,
1904 }],
1905 }],
1906 };
1907
1908 let response_context = FetchResponseContext {
1909 metadata: metadata.clone(),
1910 log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
1911 log_scanner_status: fetcher.log_scanner_status.clone(),
1912 read_context: fetcher.read_context.clone(),
1913 remote_read_context: fetcher.remote_read_context.clone(),
1914 remote_log_downloader: fetcher.remote_log_downloader.clone(),
1915 };
1916
1917 LogFetcher::handle_fetch_response(response, response_context).await;
1918
1919 assert!(metadata.get_cluster().leader_for(&bucket).is_none());
1920 Ok(())
1921 }
1922
1923 fn create_test_table_info(
1924 has_primary_key: bool,
1925 log_format: Option<&str>,
1926 ) -> (TableInfo, TablePath) {
1927 let mut schema_builder = Schema::builder()
1928 .column("id", DataTypes::int())
1929 .column("name", DataTypes::string());
1930
1931 if has_primary_key {
1932 schema_builder = schema_builder.primary_key(vec!["id"]);
1933 }
1934
1935 let schema = schema_builder.build().unwrap();
1936 let table_path = TablePath::new("test_db", "test_table");
1937
1938 let mut properties = HashMap::new();
1939 if let Some(format) = log_format {
1940 properties.insert("table.log.format".to_string(), format.to_string());
1941 }
1942
1943 let table_info = TableInfo::new(
1944 table_path.clone(),
1945 1,
1946 1,
1947 schema,
1948 vec![],
1949 Arc::from(vec![]),
1950 1,
1951 properties,
1952 HashMap::new(),
1953 None,
1954 0,
1955 0,
1956 );
1957
1958 (table_info, table_path)
1959 }
1960
1961 #[test]
1962 fn test_validate_scan_support() {
1963 let (table_info, table_path) = create_test_table_info(true, Some("ARROW"));
1965 let result = validate_scan_support(&table_path, &table_info);
1966
1967 assert!(result.is_err());
1968 let err = result.unwrap_err();
1969 assert!(matches!(err, UnsupportedOperation { .. }));
1970 assert!(err.to_string().contains(
1971 format!("Table {table_path} is not a Log Table and doesn't support scan.").as_str()
1972 ));
1973
1974 let (table_info, table_path) = create_test_table_info(false, Some("INDEXED"));
1976 let result = validate_scan_support(&table_path, &table_info);
1977
1978 assert!(result.is_err());
1979 let err = result.unwrap_err();
1980 assert!(matches!(err, UnsupportedOperation { .. }));
1981 assert!(err.to_string().contains(format!("Scan is only supported for ARROW format and table {table_path} uses INDEXED format").as_str()));
1982
1983 let (table_info, table_path) = create_test_table_info(false, None);
1985 let result = validate_scan_support(&table_path, &table_info);
1986 assert!(result.is_ok());
1987
1988 let (table_info, table_path) = create_test_table_info(false, Some("ARROW"));
1990 let result = validate_scan_support(&table_path, &table_info);
1991 assert!(result.is_ok());
1992 }
1993 #[tokio::test]
1994 async fn prepare_fetch_log_requests_uses_configured_fetch_params() -> Result<()> {
1995 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1996 let table_info = build_table_info(table_path.clone(), 1, 1);
1997 let cluster = build_cluster_arc(&table_path, 1, 1);
1998 let metadata = Arc::new(Metadata::new_for_test(cluster));
1999 let status = Arc::new(LogScannerStatus::new());
2000 status.assign_scan_bucket(TableBucket::new(1, 0), 0);
2001
2002 let config = crate::config::Config {
2003 scanner_log_fetch_max_bytes: 1234,
2004 scanner_log_fetch_min_bytes: 7,
2005 scanner_log_fetch_wait_max_time_ms: 89,
2006 scanner_log_fetch_max_bytes_for_bucket: 512,
2007 ..crate::config::Config::default()
2008 };
2009
2010 let fetcher = LogFetcher::new(
2011 table_info,
2012 Arc::new(RpcClient::new()),
2013 metadata,
2014 status,
2015 &config,
2016 None,
2017 )?;
2018
2019 let requests = fetcher.prepare_fetch_log_requests().await;
2020 assert!(!requests.is_empty());
2023 for req in requests.values() {
2024 assert_eq!(req.max_bytes, 1234);
2025 assert_eq!(req.min_bytes, Some(7));
2026 assert_eq!(req.max_wait_ms, Some(89));
2027
2028 for table_req in &req.tables_req {
2029 for bucket_req in &table_req.buckets_req {
2030 assert_eq!(bucket_req.max_fetch_bytes, 512);
2031 }
2032 }
2033 }
2034 Ok(())
2035 }
2036}