use std::sync::Arc;
use crate::input::proto::substrait;
use crate::output::diagnostic;
use crate::output::type_system::data;
use crate::parse::context;
use crate::parse::expressions;
use crate::parse::expressions::literals;
use crate::parse::expressions::references::mask;
use crate::parse::extensions;
use crate::parse::types;
use crate::util;
struct SourceInfo {
pub name: String,
pub data_type: Option<data::Type>,
}
fn parse_virtual_table(
x: &substrait::read_rel::VirtualTable,
y: &mut context::Context,
) -> diagnostic::Result<SourceInfo> {
let mut data_type: data::Type = Arc::default();
proto_repeated_field!(x, y, values, |x, y| {
let result = literals::parse_struct(x, y, false, None);
data_type = types::assert_equal(
y,
&y.data_type(),
&data_type,
"virtual table rows must have the same type",
);
result
});
describe!(y, Misc, "Virtual table");
Ok(SourceInfo {
name: String::from("virtual table"),
data_type: Some(data_type),
})
}
fn parse_path_type(
x: &substrait::read_rel::local_files::file_or_files::PathType,
y: &mut context::Context,
) -> diagnostic::Result<bool> {
use substrait::read_rel::local_files::file_or_files::PathType;
match x {
PathType::UriPath(x) => {
if let Err(e) = util::string::check_uri(x) {
diagnostic!(y, Error, e);
}
Ok(false)
}
PathType::UriPathGlob(x) => {
if let Err(e) = util::string::check_uri_glob(x) {
diagnostic!(y, Error, e);
}
Ok(true)
}
PathType::UriFile(x) => {
if let Err(e) = util::string::check_uri(x) {
diagnostic!(y, Error, e);
}
Ok(false)
}
PathType::UriFolder(x) => {
if let Err(e) = util::string::check_uri(x) {
diagnostic!(y, Error, e);
}
Ok(true)
}
}
}
fn parse_file_format(
x: &substrait::read_rel::local_files::file_or_files::FileFormat,
y: &mut context::Context,
) -> diagnostic::Result<Option<String>> {
Ok(match x {
substrait::read_rel::local_files::file_or_files::FileFormat::Parquet(_) => {
Some(String::from("Parquet"))
}
substrait::read_rel::local_files::file_or_files::FileFormat::Arrow(_) => {
Some(String::from("Arrow IPC"))
}
substrait::read_rel::local_files::file_or_files::FileFormat::Orc(_) => {
Some(String::from("Orc"))
}
substrait::read_rel::local_files::file_or_files::FileFormat::Dwrf(_) => {
Some(String::from("Dwrf"))
}
substrait::read_rel::local_files::file_or_files::FileFormat::Extension(x) => {
extensions::advanced::parse_functional_any(x, y)?;
None
}
substrait::read_rel::local_files::file_or_files::FileFormat::Text(_) => {
Some(String::from("Text"))
}
})
}
fn parse_file_or_files(
x: &substrait::read_rel::local_files::FileOrFiles,
y: &mut context::Context,
) -> diagnostic::Result<()> {
let multiple = proto_required_field!(x, y, path_type, parse_path_type)
.1
.unwrap_or_default();
let format = proto_required_field!(x, y, file_format, parse_file_format)
.1
.flatten();
proto_primitive_field!(x, y, partition_index);
proto_primitive_field!(x, y, start);
proto_primitive_field!(x, y, length);
if multiple && (x.start > 0 || x.length > 0) {
diagnostic!(
y,
Error,
IllegalValue,
"file offsets are not allowed in conjunction with multiple files"
);
}
if multiple {
describe!(y, Misc, "Multiple files");
} else {
describe!(y, Misc, "Single file");
}
summary!(y, "Read");
if x.partition_index != 0 {
summary!(y, "partition {}", x.partition_index);
}
summary!(y, "from");
if multiple {
summary!(y, "multiple");
} else {
if x.start > 0 {
if x.length > 0 {
summary!(y, "byte offset {} to {} of", x.start, x.start + x.length);
} else {
summary!(y, "byte offset {} to the end of", x.start);
}
} else if x.length > 0 {
summary!(y, "the first {} byte(s) of", x.length);
}
summary!(y, "a single");
}
if let Some(format) = format {
summary!(y, "{}", format);
}
if multiple {
summary!(y, "files");
} else {
summary!(y, "file");
}
Ok(())
}
fn parse_local_files(
x: &substrait::read_rel::LocalFiles,
y: &mut context::Context,
) -> diagnostic::Result<SourceInfo> {
proto_required_repeated_field!(x, y, items, parse_file_or_files, |_, _, _, _, _| (),);
proto_field!(
x,
y,
advanced_extension,
extensions::advanced::parse_advanced_extension
);
describe!(y, Misc, "Table from file(s)");
Ok(SourceInfo {
name: String::from("local files"),
data_type: None,
})
}
fn parse_named_table(
x: &substrait::read_rel::NamedTable,
y: &mut context::Context,
) -> diagnostic::Result<SourceInfo> {
proto_required_repeated_field!(x, y, names);
proto_field!(
x,
y,
advanced_extension,
extensions::advanced::parse_advanced_extension
);
let name = if x.names.is_empty() {
String::from("?")
} else {
if x.names.len() > 1 {
diagnostic!(
y,
Warning,
NotYetImplemented,
"named tables with multiple names"
);
}
util::string::as_ident_or_string(x.names.first().unwrap())
};
describe!(
y,
Misc,
"Named table {}",
util::string::as_ident_or_string(&name)
);
Ok(SourceInfo {
name,
data_type: None,
})
}
fn parse_extension_table(
x: &substrait::read_rel::ExtensionTable,
y: &mut context::Context,
) -> diagnostic::Result<SourceInfo> {
proto_required_field!(x, y, detail, extensions::advanced::parse_functional_any);
describe!(
y,
Misc,
"{} extension",
x.detail
.as_ref()
.map(|x| x.type_url.clone())
.unwrap_or_else(|| String::from("Unknown"))
);
Ok(SourceInfo {
name: x
.detail
.as_ref()
.map(|x| x.type_url.to_string())
.unwrap_or_else(|| String::from("extension")),
data_type: None,
})
}
fn parse_read_type(
x: &substrait::read_rel::ReadType,
y: &mut context::Context,
) -> diagnostic::Result<SourceInfo> {
match x {
substrait::read_rel::ReadType::VirtualTable(x) => parse_virtual_table(x, y),
substrait::read_rel::ReadType::LocalFiles(x) => parse_local_files(x, y),
substrait::read_rel::ReadType::NamedTable(x) => parse_named_table(x, y),
substrait::read_rel::ReadType::ExtensionTable(x) => parse_extension_table(x, y),
}
}
pub fn parse_read_rel(x: &substrait::ReadRel, y: &mut context::Context) -> diagnostic::Result<()> {
let source = proto_required_field!(x, y, read_type, parse_read_type)
.1
.unwrap_or(SourceInfo {
name: String::from("unknown source"),
data_type: None,
});
let schema = proto_required_field!(x, y, base_schema, types::parse_named_struct)
.0
.data_type
.clone();
let mut schema = match (source.data_type, schema) {
(Some(data_type), Some(schema)) => {
types::assert_equal(y, &schema, &data_type, "data differs from schema")
}
(Some(data_type), None) => data_type,
(None, Some(schema)) => schema,
(None, None) => Arc::default(),
};
if !schema.is_unresolved() && schema.nullable() {
diagnostic!(
y,
Error,
TypeMismatchedNullability,
"the outer struct representing a schema must not be nullable"
);
}
y.set_schema(schema.clone());
let filter = proto_boxed_field!(x, y, filter, expressions::parse_predicate);
let best_effort_filter =
proto_boxed_field!(x, y, best_effort_filter, expressions::parse_predicate);
if x.projection.is_some() {
schema =
proto_required_field!(x, y, projection, mask::parse_mask_expression, &schema, true)
.0
.data_type();
y.set_schema(schema.clone());
}
match (x.filter.is_some(), x.projection.is_some()) {
(false, false) => describe!(y, Relation, "Read from {}", source.name),
(false, true) => describe!(y, Relation, "Partial read from {}", source.name),
(true, false) => describe!(y, Relation, "Filtered read from {}", source.name),
(true, true) => describe!(y, Relation, "Filtered partial read from {}", source.name),
}
if let (Some(filter_node), Some(filter_expr)) = filter {
let nullable = filter_node.data_type().nullable();
summary!(
y,
"This relation discards all rows for which the expression {} yields {}.",
filter_expr,
if nullable { "false or null" } else { "false" }
);
}
if let (Some(best_effort_filter_node), Some(best_effort_filter_expr)) = best_effort_filter {
let nullable = best_effort_filter_node.data_type().nullable();
summary!(
y,
"This relation may discards all rows for which the expression {} yields {}.",
best_effort_filter_expr,
if nullable { "false or null" } else { "false" }
);
}
handle_rel_common!(x, y);
handle_advanced_extension!(x, y);
Ok(())
}