1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::hash::Hash;
7
8use itertools::Itertools as _;
9use prost::Message;
10use vortex_dtype::DType;
11use vortex_dtype::FieldName;
12use vortex_dtype::FieldNames;
13use vortex_dtype::Nullability;
14use vortex_dtype::StructFields;
15use vortex_error::VortexResult;
16use vortex_proto::expr as pb;
17use vortex_session::VortexSession;
18
19use crate::ArrayRef;
20use crate::IntoArray;
21use crate::arrays::StructArray;
22use crate::expr::Arity;
23use crate::expr::ChildName;
24use crate::expr::ExecutionArgs;
25use crate::expr::ExprId;
26use crate::expr::Expression;
27use crate::expr::VTable;
28use crate::expr::VTableExt;
29use crate::expr::lit;
30use crate::validity::Validity;
31
32pub struct Pack;
34
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub struct PackOptions {
37 pub names: FieldNames,
38 pub nullability: Nullability,
39}
40
41impl Display for PackOptions {
42 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43 write!(
44 f,
45 "names: [{}], nullability: {:#}",
46 self.names.iter().join(", "),
47 self.nullability
48 )
49 }
50}
51
52impl VTable for Pack {
53 type Options = PackOptions;
54
55 fn id(&self) -> ExprId {
56 ExprId::new_ref("vortex.pack")
57 }
58
59 fn serialize(&self, instance: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
60 Ok(Some(
61 pb::PackOpts {
62 paths: instance.names.iter().map(|n| n.to_string()).collect(),
63 nullable: instance.nullability.into(),
64 }
65 .encode_to_vec(),
66 ))
67 }
68
69 fn deserialize(
70 &self,
71 _metadata: &[u8],
72 _session: &VortexSession,
73 ) -> VortexResult<Self::Options> {
74 let opts = pb::PackOpts::decode(_metadata)?;
75 let names: FieldNames = opts
76 .paths
77 .iter()
78 .map(|name| FieldName::from(name.as_str()))
79 .collect();
80 Ok(PackOptions {
81 names,
82 nullability: opts.nullable.into(),
83 })
84 }
85
86 fn arity(&self, options: &Self::Options) -> Arity {
87 Arity::Exact(options.names.len())
88 }
89
90 fn child_name(&self, instance: &Self::Options, child_idx: usize) -> ChildName {
91 match instance.names.get(child_idx) {
92 Some(name) => ChildName::from(name.inner().clone()),
93 None => unreachable!(
94 "Invalid child index {} for Pack expression with {} fields",
95 child_idx,
96 instance.names.len()
97 ),
98 }
99 }
100
101 fn fmt_sql(
102 &self,
103 options: &Self::Options,
104 expr: &Expression,
105 f: &mut Formatter<'_>,
106 ) -> std::fmt::Result {
107 write!(f, "pack(")?;
108 for (i, (name, child)) in options.names.iter().zip(expr.children().iter()).enumerate() {
109 write!(f, "{}: ", name)?;
110 child.fmt_sql(f)?;
111 if i + 1 < options.names.len() {
112 write!(f, ", ")?;
113 }
114 }
115 write!(f, "){}", options.nullability)
116 }
117
118 fn return_dtype(&self, options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult<DType> {
119 Ok(DType::Struct(
120 StructFields::new(options.names.clone(), arg_dtypes.to_vec()),
121 options.nullability,
122 ))
123 }
124
125 fn validity(
126 &self,
127 _options: &Self::Options,
128 _expression: &Expression,
129 ) -> VortexResult<Option<Expression>> {
130 Ok(Some(lit(true)))
131 }
132
133 fn execute(&self, options: &Self::Options, args: ExecutionArgs) -> VortexResult<ArrayRef> {
134 let len = args.row_count;
135 let value_arrays = args.inputs;
136 let validity: Validity = options.nullability.into();
137 StructArray::try_new(options.names.clone(), value_arrays, len, validity)?
138 .into_array()
139 .execute(args.ctx)
140 }
141
142 fn is_null_sensitive(&self, _instance: &Self::Options) -> bool {
144 true
145 }
146
147 fn is_fallible(&self, _instance: &Self::Options) -> bool {
148 false
149 }
150}
151
152pub fn pack(
160 elements: impl IntoIterator<Item = (impl Into<FieldName>, Expression)>,
161 nullability: Nullability,
162) -> Expression {
163 let (names, values): (Vec<_>, Vec<_>) = elements
164 .into_iter()
165 .map(|(name, value)| (name.into(), value))
166 .unzip();
167 Pack.new_expr(
168 PackOptions {
169 names: names.into(),
170 nullability,
171 },
172 values,
173 )
174}
175
176#[cfg(test)]
177mod tests {
178 use vortex_buffer::buffer;
179 use vortex_dtype::Nullability;
180 use vortex_error::VortexResult;
181 use vortex_error::vortex_bail;
182
183 use super::Pack;
184 use super::PackOptions;
185 use super::pack;
186 use crate::Array;
187 use crate::ArrayRef;
188 use crate::IntoArray;
189 use crate::ToCanonical;
190 use crate::arrays::PrimitiveArray;
191 use crate::arrays::StructArray;
192 use crate::assert_arrays_eq;
193 use crate::expr::VTableExt;
194 use crate::expr::exprs::get_item::col;
195 use crate::validity::Validity;
196 use crate::vtable::ValidityHelper;
197
198 fn test_array() -> ArrayRef {
199 StructArray::from_fields(&[
200 ("a", buffer![0, 1, 2].into_array()),
201 ("b", buffer![4, 5, 6].into_array()),
202 ])
203 .unwrap()
204 .into_array()
205 }
206
207 fn primitive_field(array: &dyn Array, field_path: &[&str]) -> VortexResult<PrimitiveArray> {
208 let mut field_path = field_path.iter();
209
210 let Some(field) = field_path.next() else {
211 vortex_bail!("empty field path");
212 };
213
214 let mut array = array.to_struct().unmasked_field_by_name(field)?.clone();
215 for field in field_path {
216 array = array.to_struct().unmasked_field_by_name(field)?.clone();
217 }
218 Ok(array.to_primitive())
219 }
220
221 #[test]
222 pub fn test_empty_pack() {
223 let expr = Pack.new_expr(
224 PackOptions {
225 names: Default::default(),
226 nullability: Default::default(),
227 },
228 [],
229 );
230
231 let test_array = test_array();
232 let actual_array = test_array.clone().apply(&expr).unwrap();
233 assert_eq!(actual_array.len(), test_array.len());
234 assert_eq!(actual_array.to_struct().struct_fields().nfields(), 0);
235 }
236
237 #[test]
238 pub fn test_simple_pack() {
239 let expr = Pack.new_expr(
240 PackOptions {
241 names: ["one", "two", "three"].into(),
242 nullability: Nullability::NonNullable,
243 },
244 [col("a"), col("b"), col("a")],
245 );
246
247 let actual_array = test_array().apply(&expr).unwrap().to_struct();
248
249 assert_eq!(actual_array.names(), ["one", "two", "three"]);
250 assert_eq!(actual_array.validity(), &Validity::NonNullable);
251
252 assert_arrays_eq!(
253 primitive_field(actual_array.as_ref(), &["one"]).unwrap(),
254 PrimitiveArray::from_iter([0i32, 1, 2])
255 );
256 assert_arrays_eq!(
257 primitive_field(actual_array.as_ref(), &["two"]).unwrap(),
258 PrimitiveArray::from_iter([4i32, 5, 6])
259 );
260 assert_arrays_eq!(
261 primitive_field(actual_array.as_ref(), &["three"]).unwrap(),
262 PrimitiveArray::from_iter([0i32, 1, 2])
263 );
264 }
265
266 #[test]
267 pub fn test_nested_pack() {
268 let expr = Pack.new_expr(
269 PackOptions {
270 names: ["one", "two", "three"].into(),
271 nullability: Nullability::NonNullable,
272 },
273 [
274 col("a"),
275 Pack.new_expr(
276 PackOptions {
277 names: ["two_one", "two_two"].into(),
278 nullability: Nullability::NonNullable,
279 },
280 [col("b"), col("b")],
281 ),
282 col("a"),
283 ],
284 );
285
286 let actual_array = test_array().apply(&expr).unwrap().to_struct();
287
288 assert_eq!(actual_array.names(), ["one", "two", "three"]);
289
290 assert_arrays_eq!(
291 primitive_field(actual_array.as_ref(), &["one"]).unwrap(),
292 PrimitiveArray::from_iter([0i32, 1, 2])
293 );
294 assert_arrays_eq!(
295 primitive_field(actual_array.as_ref(), &["two", "two_one"]).unwrap(),
296 PrimitiveArray::from_iter([4i32, 5, 6])
297 );
298 assert_arrays_eq!(
299 primitive_field(actual_array.as_ref(), &["two", "two_two"]).unwrap(),
300 PrimitiveArray::from_iter([4i32, 5, 6])
301 );
302 assert_arrays_eq!(
303 primitive_field(actual_array.as_ref(), &["three"]).unwrap(),
304 PrimitiveArray::from_iter([0i32, 1, 2])
305 );
306 }
307
308 #[test]
309 pub fn test_pack_nullable() {
310 let expr = Pack.new_expr(
311 PackOptions {
312 names: ["one", "two", "three"].into(),
313 nullability: Nullability::Nullable,
314 },
315 [col("a"), col("b"), col("a")],
316 );
317
318 let actual_array = test_array().apply(&expr).unwrap().to_struct();
319
320 assert_eq!(actual_array.names(), ["one", "two", "three"]);
321 assert_eq!(actual_array.validity(), &Validity::AllValid);
322 }
323
324 #[test]
325 pub fn test_display() {
326 let expr = pack(
327 [("id", col("user_id")), ("name", col("username"))],
328 Nullability::NonNullable,
329 );
330 assert_eq!(expr.to_string(), "pack(id: $.user_id, name: $.username)");
331
332 let expr2 = Pack.new_expr(
333 PackOptions {
334 names: ["x", "y"].into(),
335 nullability: Nullability::Nullable,
336 },
337 [col("a"), col("b")],
338 );
339 assert_eq!(expr2.to_string(), "pack(x: $.a, y: $.b)?");
340 }
341}