use std::collections::HashSet;
use arrow::array::RecordBatch;
use futures::{TryStreamExt, stream::BoxStream};
use iceberg::{
arrow::ArrowReaderBuilder,
expr::Predicate,
spec::{ManifestStatus, Operation},
table::Table,
};
use tracing::debug;
use crate::error::Result;
pub type IcebergRecordBatchStream = BoxStream<'static, iceberg::Result<RecordBatch>>;
pub async fn scan_table(table: &Table) -> Result<IcebergRecordBatchStream> {
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
Ok(stream)
}
pub async fn scan_snapshot(table: &Table, snapshot_id: i64) -> Result<IcebergRecordBatchStream> {
let scan = table.scan().snapshot_id(snapshot_id).build()?;
let stream = scan.to_arrow().await?;
Ok(stream)
}
pub async fn scan_columns(table: &Table, columns: Vec<&str>) -> Result<IcebergRecordBatchStream> {
let scan = table.scan().select(columns).build()?;
let stream = scan.to_arrow().await?;
Ok(stream)
}
pub async fn scan_since_snapshot(
table: &Table,
after_snapshot_id: i64,
) -> Result<IcebergRecordBatchStream> {
scan_snapshot_range(table, after_snapshot_id, None).await
}
pub async fn scan_snapshot_range(
table: &Table,
after_snapshot_id: i64,
to_snapshot_id: Option<i64>,
) -> Result<IcebergRecordBatchStream> {
let after_snapshot = table
.metadata()
.snapshot_by_id(after_snapshot_id)
.ok_or_else(|| {
iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
format!("checkpoint snapshot {after_snapshot_id} not found in table metadata"),
)
})?;
let after_seq_num = after_snapshot.sequence_number();
let to_snapshot = match to_snapshot_id {
Some(id) => table.metadata().snapshot_by_id(id).ok_or_else(|| {
iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
format!("upper-bound snapshot {id} not found in table metadata"),
)
})?,
None => table.metadata().current_snapshot().ok_or_else(|| {
iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"table has no current snapshot",
)
})?,
};
let to_id = to_snapshot.snapshot_id();
let to_seq_num = to_snapshot.sequence_number();
if to_seq_num <= after_seq_num {
debug!(
after_snapshot_id,
after_seq_num,
to_snapshot_id = to_id,
to_seq_num,
"upper-bound snapshot is not ahead of lower-bound, returning empty stream"
);
return Ok(Box::pin(futures::stream::empty()));
}
debug!(
after_snapshot_id,
after_seq_num,
to_snapshot_id = to_id,
to_seq_num,
"scanning incremental data in snapshot range"
);
let mut replace_seq_nums: HashSet<i64> = HashSet::new();
{
let mut snap_id = Some(to_id);
while let Some(id) = snap_id {
if id == after_snapshot_id {
break;
}
if let Some(snapshot) = table.metadata().snapshot_by_id(id) {
if snapshot.summary().operation == Operation::Replace {
replace_seq_nums.insert(snapshot.sequence_number());
}
snap_id = snapshot.parent_snapshot_id();
} else {
break;
}
}
}
let manifest_list = to_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await?;
let mut new_file_paths: HashSet<String> = HashSet::new();
for manifest_file in manifest_list.entries() {
if manifest_file.sequence_number <= after_seq_num {
continue;
}
if manifest_file.sequence_number > to_seq_num {
continue;
}
if replace_seq_nums.contains(&manifest_file.sequence_number) {
debug!(
seq = manifest_file.sequence_number,
"skipping compaction manifest"
);
continue;
}
let manifest = manifest_file.load_manifest(table.file_io()).await?;
for entry in manifest.entries() {
if entry.status() != ManifestStatus::Added {
continue;
}
if let Some(entry_seq) = entry.sequence_number()
&& (entry_seq <= after_seq_num || entry_seq > to_seq_num)
{
continue;
}
new_file_paths.insert(entry.file_path().to_string());
}
}
debug!(
new_files = new_file_paths.len(),
"identified data files in snapshot range"
);
if new_file_paths.is_empty() {
return Ok(Box::pin(futures::stream::empty()));
}
let scan = table.scan().snapshot_id(to_id).build()?;
let file_tasks = scan.plan_files().await?;
let filtered_tasks = file_tasks.try_filter(move |task| {
futures::future::ready(new_file_paths.contains(&task.data_file_path))
});
let reader = ArrowReaderBuilder::new(table.file_io().clone()).build();
let stream = reader.read(Box::pin(filtered_tasks))?;
Ok(stream)
}
pub fn earliest_snapshot(table: &Table) -> Option<i64> {
table
.metadata()
.snapshots()
.min_by_key(|s| s.sequence_number())
.map(|s| s.snapshot_id())
}
pub fn snapshot_at_timestamp(table: &Table, timestamp_ms: i64) -> Option<i64> {
table
.metadata()
.snapshots()
.filter(|s| s.timestamp_ms() <= timestamp_ms)
.max_by_key(|s| s.timestamp_ms())
.map(|s| s.snapshot_id())
}
pub async fn scan_at_timestamp(
table: &Table,
timestamp_ms: i64,
) -> Result<IcebergRecordBatchStream> {
let snapshot_id = snapshot_at_timestamp(table, timestamp_ms).ok_or_else(|| {
iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
format!("no snapshot found at or before timestamp {timestamp_ms}"),
)
})?;
debug!(
snapshot_id,
timestamp_ms, "time-travel scan resolved to snapshot"
);
scan_snapshot(table, snapshot_id).await
}
pub async fn scan_with_filter(
table: &Table,
filter: Predicate,
) -> Result<IcebergRecordBatchStream> {
let scan = table.scan().with_filter(filter).build()?;
let stream = scan.to_arrow().await?;
Ok(stream)
}