1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use super::{ExecutorResult, ExecutorTable, ScanProjection};
use arrow::datatypes::{DataType, Field, Schema};
use llkv_expr::expr::ScalarExpr;
use llkv_expr::literal::Literal;
use llkv_result::Error;
use llkv_storage::pager::Pager;
use llkv_table::types::FieldId;
use simd_r_drive_entry_handle::EntryHandle;
use std::collections::HashMap;
use std::sync::Arc;
pub fn schema_for_projections<P>(
table: &ExecutorTable<P>,
projections: &[ScanProjection],
) -> ExecutorResult<Arc<Schema>>
where
P: Pager<Blob = EntryHandle> + Send + Sync,
{
let mut fields: Vec<Field> = Vec::with_capacity(projections.len());
for projection in projections {
match projection {
ScanProjection::Column(proj) => {
let field_id = proj.logical_field_id.field_id();
let column = table.schema.column_by_field_id(field_id).ok_or_else(|| {
Error::InvalidArgumentError(format!(
"unknown column with field id {} in projection",
field_id
))
})?;
let name = proj.alias.clone().unwrap_or_else(|| column.name.clone());
let mut metadata = HashMap::new();
metadata.insert(
llkv_table::constants::FIELD_ID_META_KEY.to_string(),
column.field_id.to_string(),
);
let field = Field::new(&name, column.data_type.clone(), column.nullable)
.with_metadata(metadata);
fields.push(field);
}
ScanProjection::Computed { alias, expr } => {
let dtype = match expr {
ScalarExpr::Literal(Literal::Integer(_)) => DataType::Int64,
ScalarExpr::Literal(Literal::Float(_)) => DataType::Float64,
ScalarExpr::Literal(Literal::Boolean(_)) => DataType::Boolean,
ScalarExpr::Literal(Literal::String(_)) => DataType::Utf8,
ScalarExpr::Literal(Literal::Null) => DataType::Null,
ScalarExpr::Literal(Literal::Struct(_)) => {
// For struct literals, we need to infer the type
// For now, create a simple struct type
// In a real implementation, we'd recursively determine field types
DataType::Utf8 // Placeholder - should be DataType::Struct
}
ScalarExpr::Column(field_id) => {
let column =
table.schema.column_by_field_id(*field_id).ok_or_else(|| {
Error::InvalidArgumentError(format!(
"unknown column with field id {} in computed projection",
field_id
))
})?;
column.data_type.clone()
}
ScalarExpr::Binary { .. } => DataType::Float64,
ScalarExpr::Aggregate(_) => DataType::Int64, // Aggregates return Int64
ScalarExpr::GetField { base, field_name } => {
// Recursively determine the data type of the field being accessed
fn get_field_type<P>(
expr: &ScalarExpr<FieldId>,
field_name: &str,
table: &ExecutorTable<P>,
) -> ExecutorResult<DataType>
where
P: Pager<Blob = EntryHandle> + Send + Sync,
{
let base_dtype = match expr {
ScalarExpr::Column(fid) => {
let column =
table.schema.column_by_field_id(*fid).ok_or_else(|| {
Error::InvalidArgumentError(format!(
"unknown column with field id {} in GetField",
fid
))
})?;
column.data_type.clone()
}
ScalarExpr::GetField {
base: inner_base,
field_name: inner_field,
} => get_field_type(inner_base, inner_field, table)?,
_ => {
return Err(Error::InvalidArgumentError(
"GetField base must be a column or another GetField".into(),
));
}
};
if let DataType::Struct(fields) = base_dtype {
fields
.iter()
.find(|f| f.name() == field_name)
.map(|f| f.data_type().clone())
.ok_or_else(|| {
Error::InvalidArgumentError(format!(
"Field '{}' not found in struct",
field_name
))
})
} else {
Err(Error::InvalidArgumentError(
"GetField can only be applied to struct types".into(),
))
}
}
get_field_type(base, field_name, table)?
}
};
let field = Field::new(alias, dtype, true);
fields.push(field);
}
}
}
Ok(Arc::new(Schema::new(fields)))
}