use std::sync::Arc;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize, Serializer};
use crate::Result;
use crate::expr::BoundPredicate;
use crate::spec::{
DataContentType, DataFileFormat, ManifestEntryRef, NameMapping, PartitionSpec, Schema,
SchemaRef, Struct,
};
pub type FileScanTaskStream = BoxStream<'static, Result<FileScanTask>>;
fn serialize_not_implemented<S, T>(_: &T, _: S) -> std::result::Result<S::Ok, S::Error>
where S: Serializer {
Err(serde::ser::Error::custom(
"Serialization not implemented for this field",
))
}
fn deserialize_not_implemented<'de, D, T>(_: D) -> std::result::Result<T, D::Error>
where D: serde::Deserializer<'de> {
Err(serde::de::Error::custom(
"Deserialization not implemented for this field",
))
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FileScanTask {
pub file_size_in_bytes: u64,
pub start: u64,
pub length: u64,
pub record_count: Option<u64>,
pub data_file_path: String,
pub data_file_format: DataFileFormat,
pub schema: SchemaRef,
pub project_field_ids: Vec<i32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate: Option<BoundPredicate>,
pub deletes: Vec<FileScanTaskDeleteFile>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "serialize_not_implemented")]
#[serde(deserialize_with = "deserialize_not_implemented")]
pub partition: Option<Struct>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "serialize_not_implemented")]
#[serde(deserialize_with = "deserialize_not_implemented")]
pub partition_spec: Option<Arc<PartitionSpec>>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(serialize_with = "serialize_not_implemented")]
#[serde(deserialize_with = "deserialize_not_implemented")]
pub name_mapping: Option<Arc<NameMapping>>,
pub case_sensitive: bool,
}
impl FileScanTask {
pub fn data_file_path(&self) -> &str {
&self.data_file_path
}
pub fn project_field_ids(&self) -> &[i32] {
&self.project_field_ids
}
pub fn predicate(&self) -> Option<&BoundPredicate> {
self.predicate.as_ref()
}
pub fn schema(&self) -> &Schema {
&self.schema
}
pub fn schema_ref(&self) -> SchemaRef {
self.schema.clone()
}
}
#[derive(Debug)]
pub(crate) struct DeleteFileContext {
pub(crate) manifest_entry: ManifestEntryRef,
pub(crate) partition_spec_id: i32,
}
impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
fn from(ctx: &DeleteFileContext) -> Self {
FileScanTaskDeleteFile {
file_path: ctx.manifest_entry.file_path().to_string(),
file_size_in_bytes: ctx.manifest_entry.file_size_in_bytes(),
file_type: ctx.manifest_entry.content_type(),
partition_spec_id: ctx.partition_spec_id,
equality_ids: ctx.manifest_entry.data_file.equality_ids.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FileScanTaskDeleteFile {
pub file_path: String,
pub file_size_in_bytes: u64,
pub file_type: DataContentType,
pub partition_spec_id: i32,
pub equality_ids: Option<Vec<i32>>,
}