Skip to main content

wp_arrow/
convert.rs

1use std::net::IpAddr;
2use std::sync::Arc;
3
4use arrow::array::{
5    Array, BooleanArray, Float64Array, Int64Array, ListArray, StringArray, TimestampNanosecondArray,
6};
7use arrow::array::{
8    ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, ListBuilder, RecordBatch,
9    StringBuilder, TimestampNanosecondBuilder,
10};
11use chrono::DateTime;
12
13use wp_model_core::model::{
14    DataRecord, DataType, FValueStr, Field, FieldStorage, HexT, IpNetValue, Value,
15};
16
17use crate::error::WpArrowError;
18use crate::schema::{FieldDef, WpDataType, to_arrow_schema};
19
20/// Convert row-oriented DataRecords to a columnar Arrow RecordBatch.
21///
22/// Schema is driven by `field_defs`. For each FieldDef, the corresponding value
23/// is looked up by name in every record. Missing nullable fields become null;
24/// missing non-nullable fields produce an error.
25pub fn records_to_batch(
26    records: &[DataRecord],
27    field_defs: &[FieldDef],
28) -> Result<RecordBatch, WpArrowError> {
29    let schema = to_arrow_schema(field_defs)?;
30    let columns: Vec<ArrayRef> = field_defs
31        .iter()
32        .map(|fd| build_column(fd, records))
33        .collect::<Result<_, _>>()?;
34    RecordBatch::try_new(Arc::new(schema), columns)
35        .map_err(|e| WpArrowError::ArrowBuildError(e.to_string()))
36}
37
38/// Convert a columnar Arrow RecordBatch back to row-oriented DataRecords.
39///
40/// `field_defs` provides WpDataType metadata for distinguishing Arrow Utf8 columns
41/// (which may represent Chars, Ip, or Hex). Record IDs are set to sequential row indices.
42pub fn batch_to_records(
43    batch: &RecordBatch,
44    field_defs: &[FieldDef],
45) -> Result<Vec<DataRecord>, WpArrowError> {
46    if field_defs.len() != batch.num_columns() {
47        return Err(WpArrowError::SchemaMismatch {
48            expected: field_defs.len(),
49            actual: batch.num_columns(),
50        });
51    }
52
53    let num_rows = batch.num_rows();
54    let mut records = Vec::with_capacity(num_rows);
55
56    for row_idx in 0..num_rows {
57        let mut items = Vec::with_capacity(field_defs.len());
58        for (col_idx, fd) in field_defs.iter().enumerate() {
59            let col = batch.column(col_idx);
60            if col.is_null(row_idx) {
61                continue;
62            }
63            let value = extract_value(col, row_idx, &fd.data_type, &fd.name)?;
64            let meta = wp_type_to_model_meta(&fd.data_type);
65            let field = Field::new(meta, fd.name.as_str(), value);
66            items.push(FieldStorage::from_owned(field));
67        }
68        let mut record = DataRecord::from(items);
69        record.id = row_idx as u64;
70        records.push(record);
71    }
72
73    Ok(records)
74}
75
76// ---------------------------------------------------------------------------
77// Internal helpers for records_to_batch
78// ---------------------------------------------------------------------------
79
80fn build_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
81    match &fd.data_type {
82        WpDataType::Chars | WpDataType::Ip | WpDataType::Hex => build_string_column(fd, records),
83        WpDataType::Digit => build_digit_column(fd, records),
84        WpDataType::Float => build_float_column(fd, records),
85        WpDataType::Bool => build_bool_column(fd, records),
86        WpDataType::Time => build_time_column(fd, records),
87        WpDataType::Array(inner) => build_list_column(fd, records, inner),
88    }
89}
90
91fn build_string_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
92    let mut builder = StringBuilder::with_capacity(records.len(), records.len() * 32);
93    for rec in records {
94        match rec.get_value(&fd.name) {
95            Some(Value::Null) | None => {
96                handle_null(&mut builder, fd, |b| b.append_null())?;
97            }
98            Some(val) => {
99                let s = value_to_string(val, &fd.data_type, &fd.name)?;
100                builder.append_value(&s);
101            }
102        }
103    }
104    Ok(Arc::new(builder.finish()))
105}
106
107fn build_digit_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
108    let mut builder = Int64Builder::with_capacity(records.len());
109    for rec in records {
110        match rec.get_value(&fd.name) {
111            Some(Value::Null) | None => {
112                handle_null(&mut builder, fd, |b| b.append_null())?;
113            }
114            Some(Value::Digit(v)) => builder.append_value(*v),
115            Some(other) => {
116                return Err(WpArrowError::ValueConversionError {
117                    field_name: fd.name.clone(),
118                    expected: "Digit".to_string(),
119                    actual: other.tag().to_string(),
120                });
121            }
122        }
123    }
124    Ok(Arc::new(builder.finish()))
125}
126
127fn build_float_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
128    let mut builder = Float64Builder::with_capacity(records.len());
129    for rec in records {
130        match rec.get_value(&fd.name) {
131            Some(Value::Null) | None => {
132                handle_null(&mut builder, fd, |b| b.append_null())?;
133            }
134            Some(Value::Float(v)) => builder.append_value(*v),
135            Some(other) => {
136                return Err(WpArrowError::ValueConversionError {
137                    field_name: fd.name.clone(),
138                    expected: "Float".to_string(),
139                    actual: other.tag().to_string(),
140                });
141            }
142        }
143    }
144    Ok(Arc::new(builder.finish()))
145}
146
147fn build_bool_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
148    let mut builder = BooleanBuilder::with_capacity(records.len());
149    for rec in records {
150        match rec.get_value(&fd.name) {
151            Some(Value::Null) | None => {
152                handle_null(&mut builder, fd, |b| b.append_null())?;
153            }
154            Some(Value::Bool(v)) => builder.append_value(*v),
155            Some(other) => {
156                return Err(WpArrowError::ValueConversionError {
157                    field_name: fd.name.clone(),
158                    expected: "Bool".to_string(),
159                    actual: other.tag().to_string(),
160                });
161            }
162        }
163    }
164    Ok(Arc::new(builder.finish()))
165}
166
167fn build_time_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
168    let mut builder = TimestampNanosecondBuilder::with_capacity(records.len());
169    for rec in records {
170        match rec.get_value(&fd.name) {
171            Some(Value::Null) | None => {
172                handle_null(&mut builder, fd, |b| b.append_null())?;
173            }
174            Some(Value::Time(ndt)) => {
175                let nanos = ndt.and_utc().timestamp_nanos_opt().ok_or_else(|| {
176                    WpArrowError::TimestampOverflow {
177                        field_name: fd.name.clone(),
178                    }
179                })?;
180                builder.append_value(nanos);
181            }
182            Some(other) => {
183                return Err(WpArrowError::ValueConversionError {
184                    field_name: fd.name.clone(),
185                    expected: "Time".to_string(),
186                    actual: other.tag().to_string(),
187                });
188            }
189        }
190    }
191    Ok(Arc::new(builder.finish()))
192}
193
194fn build_list_column(
195    fd: &FieldDef,
196    records: &[DataRecord],
197    inner_type: &WpDataType,
198) -> Result<ArrayRef, WpArrowError> {
199    match inner_type {
200        WpDataType::Chars | WpDataType::Ip | WpDataType::Hex => {
201            build_list_string(fd, records, inner_type)
202        }
203        WpDataType::Digit => build_list_digit(fd, records),
204        WpDataType::Float => build_list_float(fd, records),
205        WpDataType::Bool => build_list_bool(fd, records),
206        WpDataType::Time => build_list_time(fd, records),
207        WpDataType::Array(_) => Err(WpArrowError::UnsupportedDataType(
208            "nested array<array<...>> not supported".to_string(),
209        )),
210    }
211}
212
213fn build_list_string(
214    fd: &FieldDef,
215    records: &[DataRecord],
216    inner_type: &WpDataType,
217) -> Result<ArrayRef, WpArrowError> {
218    let mut builder = ListBuilder::new(StringBuilder::new());
219    for rec in records {
220        match rec.get_value(&fd.name) {
221            Some(Value::Null) | None => {
222                handle_null(&mut builder, fd, |b| b.append_null())?;
223            }
224            Some(Value::Array(items)) => {
225                for item in items {
226                    let val = item.get_value();
227                    if matches!(val, Value::Null) {
228                        builder.values().append_null();
229                    } else {
230                        let s = value_to_string(val, inner_type, &fd.name)?;
231                        builder.values().append_value(&s);
232                    }
233                }
234                builder.append(true);
235            }
236            Some(other) => {
237                return Err(WpArrowError::ValueConversionError {
238                    field_name: fd.name.clone(),
239                    expected: "Array".to_string(),
240                    actual: other.tag().to_string(),
241                });
242            }
243        }
244    }
245    Ok(Arc::new(builder.finish()))
246}
247
248fn build_list_digit(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
249    let mut builder = ListBuilder::new(Int64Builder::new());
250    for rec in records {
251        match rec.get_value(&fd.name) {
252            Some(Value::Null) | None => {
253                handle_null(&mut builder, fd, |b| b.append_null())?;
254            }
255            Some(Value::Array(items)) => {
256                for item in items {
257                    match item.get_value() {
258                        Value::Digit(v) => builder.values().append_value(*v),
259                        Value::Null => builder.values().append_null(),
260                        other => {
261                            return Err(WpArrowError::ValueConversionError {
262                                field_name: fd.name.clone(),
263                                expected: "Digit".to_string(),
264                                actual: other.tag().to_string(),
265                            });
266                        }
267                    }
268                }
269                builder.append(true);
270            }
271            Some(other) => {
272                return Err(WpArrowError::ValueConversionError {
273                    field_name: fd.name.clone(),
274                    expected: "Array".to_string(),
275                    actual: other.tag().to_string(),
276                });
277            }
278        }
279    }
280    Ok(Arc::new(builder.finish()))
281}
282
283fn build_list_float(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
284    let mut builder = ListBuilder::new(Float64Builder::new());
285    for rec in records {
286        match rec.get_value(&fd.name) {
287            Some(Value::Null) | None => {
288                handle_null(&mut builder, fd, |b| b.append_null())?;
289            }
290            Some(Value::Array(items)) => {
291                for item in items {
292                    match item.get_value() {
293                        Value::Float(v) => builder.values().append_value(*v),
294                        Value::Null => builder.values().append_null(),
295                        other => {
296                            return Err(WpArrowError::ValueConversionError {
297                                field_name: fd.name.clone(),
298                                expected: "Float".to_string(),
299                                actual: other.tag().to_string(),
300                            });
301                        }
302                    }
303                }
304                builder.append(true);
305            }
306            Some(other) => {
307                return Err(WpArrowError::ValueConversionError {
308                    field_name: fd.name.clone(),
309                    expected: "Array".to_string(),
310                    actual: other.tag().to_string(),
311                });
312            }
313        }
314    }
315    Ok(Arc::new(builder.finish()))
316}
317
318fn build_list_bool(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
319    let mut builder = ListBuilder::new(BooleanBuilder::new());
320    for rec in records {
321        match rec.get_value(&fd.name) {
322            Some(Value::Null) | None => {
323                handle_null(&mut builder, fd, |b| b.append_null())?;
324            }
325            Some(Value::Array(items)) => {
326                for item in items {
327                    match item.get_value() {
328                        Value::Bool(v) => builder.values().append_value(*v),
329                        Value::Null => builder.values().append_null(),
330                        other => {
331                            return Err(WpArrowError::ValueConversionError {
332                                field_name: fd.name.clone(),
333                                expected: "Bool".to_string(),
334                                actual: other.tag().to_string(),
335                            });
336                        }
337                    }
338                }
339                builder.append(true);
340            }
341            Some(other) => {
342                return Err(WpArrowError::ValueConversionError {
343                    field_name: fd.name.clone(),
344                    expected: "Array".to_string(),
345                    actual: other.tag().to_string(),
346                });
347            }
348        }
349    }
350    Ok(Arc::new(builder.finish()))
351}
352
353fn build_list_time(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
354    let mut builder = ListBuilder::new(TimestampNanosecondBuilder::new());
355    for rec in records {
356        match rec.get_value(&fd.name) {
357            Some(Value::Null) | None => {
358                handle_null(&mut builder, fd, |b| b.append_null())?;
359            }
360            Some(Value::Array(items)) => {
361                for item in items {
362                    match item.get_value() {
363                        Value::Time(ndt) => {
364                            let nanos = ndt.and_utc().timestamp_nanos_opt().ok_or_else(|| {
365                                WpArrowError::TimestampOverflow {
366                                    field_name: fd.name.clone(),
367                                }
368                            })?;
369                            builder.values().append_value(nanos);
370                        }
371                        Value::Null => builder.values().append_null(),
372                        other => {
373                            return Err(WpArrowError::ValueConversionError {
374                                field_name: fd.name.clone(),
375                                expected: "Time".to_string(),
376                                actual: other.tag().to_string(),
377                            });
378                        }
379                    }
380                }
381                builder.append(true);
382            }
383            Some(other) => {
384                return Err(WpArrowError::ValueConversionError {
385                    field_name: fd.name.clone(),
386                    expected: "Array".to_string(),
387                    actual: other.tag().to_string(),
388                });
389            }
390        }
391    }
392    Ok(Arc::new(builder.finish()))
393}
394
395/// Convert a Value to its string representation for Arrow Utf8 columns.
396fn value_to_string(
397    val: &Value,
398    wp_type: &WpDataType,
399    field_name: &str,
400) -> Result<String, WpArrowError> {
401    match (wp_type, val) {
402        // Chars accepts any text-like Value
403        (WpDataType::Chars, Value::Chars(s)) => Ok(s.to_string()),
404        (WpDataType::Chars, Value::Domain(d)) => Ok(d.to_string()),
405        (WpDataType::Chars, Value::Url(u)) => Ok(u.to_string()),
406        (WpDataType::Chars, Value::Email(e)) => Ok(e.to_string()),
407        // Ip accepts IpAddr, IpNet, and Chars fallback
408        (WpDataType::Ip, Value::IpAddr(ip)) => Ok(ip.to_string()),
409        (WpDataType::Ip, Value::IpNet(net)) => Ok(net.to_string()),
410        (WpDataType::Ip, Value::Chars(s)) => Ok(s.to_string()),
411        // Hex
412        (WpDataType::Hex, Value::Hex(h)) => Ok(format!("{:#X}", h.0)),
413        _ => Err(WpArrowError::ValueConversionError {
414            field_name: field_name.to_string(),
415            expected: format!("{:?}", wp_type),
416            actual: val.tag().to_string(),
417        }),
418    }
419}
420
421/// Handle null/missing values: append null if nullable, error if required.
422fn handle_null<B, F>(builder: &mut B, fd: &FieldDef, append_null: F) -> Result<(), WpArrowError>
423where
424    F: FnOnce(&mut B),
425{
426    if fd.nullable {
427        append_null(builder);
428        Ok(())
429    } else {
430        Err(WpArrowError::MissingRequiredField {
431            field_name: fd.name.clone(),
432        })
433    }
434}
435
436// ---------------------------------------------------------------------------
437// Internal helpers for batch_to_records
438// ---------------------------------------------------------------------------
439
440/// Extract a Value from an Arrow array column at the given row index.
441fn extract_value(
442    col: &ArrayRef,
443    row_idx: usize,
444    wp_type: &WpDataType,
445    field_name: &str,
446) -> Result<Value, WpArrowError> {
447    match wp_type {
448        WpDataType::Chars => {
449            let arr = col
450                .as_any()
451                .downcast_ref::<StringArray>()
452                .ok_or_else(|| WpArrowError::ArrowBuildError("expected StringArray".to_string()))?;
453            Ok(Value::Chars(FValueStr::from(arr.value(row_idx))))
454        }
455        WpDataType::Digit => {
456            let arr = col
457                .as_any()
458                .downcast_ref::<Int64Array>()
459                .ok_or_else(|| WpArrowError::ArrowBuildError("expected Int64Array".to_string()))?;
460            Ok(Value::Digit(arr.value(row_idx)))
461        }
462        WpDataType::Float => {
463            let arr = col.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
464                WpArrowError::ArrowBuildError("expected Float64Array".to_string())
465            })?;
466            Ok(Value::Float(arr.value(row_idx)))
467        }
468        WpDataType::Bool => {
469            let arr = col.as_any().downcast_ref::<BooleanArray>().ok_or_else(|| {
470                WpArrowError::ArrowBuildError("expected BooleanArray".to_string())
471            })?;
472            Ok(Value::Bool(arr.value(row_idx)))
473        }
474        WpDataType::Time => {
475            let arr = col
476                .as_any()
477                .downcast_ref::<TimestampNanosecondArray>()
478                .ok_or_else(|| {
479                    WpArrowError::ArrowBuildError("expected TimestampNanosecondArray".to_string())
480                })?;
481            let nanos = arr.value(row_idx);
482            let ndt = DateTime::from_timestamp_nanos(nanos).naive_utc();
483            Ok(Value::Time(ndt))
484        }
485        WpDataType::Ip => {
486            let arr = col
487                .as_any()
488                .downcast_ref::<StringArray>()
489                .ok_or_else(|| WpArrowError::ArrowBuildError("expected StringArray".to_string()))?;
490            let s = arr.value(row_idx);
491            Ok(parse_ip_value(s, field_name)?)
492        }
493        WpDataType::Hex => {
494            let arr = col
495                .as_any()
496                .downcast_ref::<StringArray>()
497                .ok_or_else(|| WpArrowError::ArrowBuildError("expected StringArray".to_string()))?;
498            let s = arr.value(row_idx);
499            Ok(parse_hex_value(s, field_name)?)
500        }
501        WpDataType::Array(inner) => {
502            let arr = col
503                .as_any()
504                .downcast_ref::<ListArray>()
505                .ok_or_else(|| WpArrowError::ArrowBuildError("expected ListArray".to_string()))?;
506            let inner_arr = arr.value(row_idx);
507            let inner_meta = wp_type_to_model_meta(inner);
508            let mut items = Vec::new();
509            for i in 0..inner_arr.len() {
510                if inner_arr.is_null(i) {
511                    items.push(FieldStorage::from_owned(Field::new(
512                        inner_meta.clone(),
513                        "item",
514                        Value::Null,
515                    )));
516                } else {
517                    let val = extract_value(&inner_arr, i, inner, field_name)?;
518                    items.push(FieldStorage::from_owned(Field::new(
519                        inner_meta.clone(),
520                        "item",
521                        val,
522                    )));
523                }
524            }
525            Ok(Value::Array(items))
526        }
527    }
528}
529
530/// Parse a string as an IP address or CIDR network.
531fn parse_ip_value(s: &str, field_name: &str) -> Result<Value, WpArrowError> {
532    if s.contains('/') {
533        // Try parsing as IpNet (CIDR notation)
534        let parts: Vec<&str> = s.splitn(2, '/').collect();
535        let addr: IpAddr = parts[0].parse().map_err(|e| WpArrowError::ParseError {
536            field_name: field_name.to_string(),
537            detail: format!("invalid IP address: {e}"),
538        })?;
539        let prefix: u8 = parts[1].parse().map_err(|e| WpArrowError::ParseError {
540            field_name: field_name.to_string(),
541            detail: format!("invalid prefix length: {e}"),
542        })?;
543        let net = IpNetValue::new(addr, prefix).ok_or_else(|| WpArrowError::ParseError {
544            field_name: field_name.to_string(),
545            detail: format!("invalid prefix length {prefix} for {addr}"),
546        })?;
547        Ok(Value::IpNet(net))
548    } else {
549        // Try parsing as plain IpAddr
550        let addr: IpAddr = s.parse().map_err(|e| WpArrowError::ParseError {
551            field_name: field_name.to_string(),
552            detail: format!("invalid IP address: {e}"),
553        })?;
554        Ok(Value::IpAddr(addr))
555    }
556}
557
558/// Parse a hex string (with optional 0x/0X prefix) into a HexT value.
559fn parse_hex_value(s: &str, field_name: &str) -> Result<Value, WpArrowError> {
560    let hex_str = s
561        .strip_prefix("0x")
562        .or_else(|| s.strip_prefix("0X"))
563        .unwrap_or(s);
564    let v = u128::from_str_radix(hex_str, 16).map_err(|e| WpArrowError::ParseError {
565        field_name: field_name.to_string(),
566        detail: format!("invalid hex: {e}"),
567    })?;
568    Ok(Value::Hex(HexT(v)))
569}
570
571/// Map a WpDataType to the corresponding wp-model-core DataType for Field.meta.
572fn wp_type_to_model_meta(wp_type: &WpDataType) -> DataType {
573    match wp_type {
574        WpDataType::Chars => DataType::Chars,
575        WpDataType::Digit => DataType::Digit,
576        WpDataType::Float => DataType::Float,
577        WpDataType::Bool => DataType::Bool,
578        WpDataType::Time => DataType::Time,
579        WpDataType::Ip => DataType::IP,
580        WpDataType::Hex => DataType::Hex,
581        WpDataType::Array(inner) => {
582            let inner_name = match inner.as_ref() {
583                WpDataType::Chars => "chars",
584                WpDataType::Digit => "digit",
585                WpDataType::Float => "float",
586                WpDataType::Bool => "bool",
587                WpDataType::Time => "time",
588                WpDataType::Ip => "ip",
589                WpDataType::Hex => "hex",
590                WpDataType::Array(_) => "array",
591            };
592            DataType::Array(inner_name.to_string())
593        }
594    }
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use crate::schema::{FieldDef, WpDataType};
601    use arrow::array::AsArray;
602    use chrono::NaiveDateTime;
603    use std::net::{IpAddr, Ipv4Addr};
604    use wp_model_core::model::{DataField, DataRecord, Field, Value};
605
606    // Helper to build a DataRecord from a list of Fields
607    fn make_record(fields: Vec<DataField>) -> DataRecord {
608        DataRecord::from(fields)
609    }
610
611    // =======================================================================
612    // records_to_batch tests
613    // =======================================================================
614
615    #[test]
616    fn r2b_basic_types() {
617        let fds = vec![
618            FieldDef::new("name", WpDataType::Chars),
619            FieldDef::new("count", WpDataType::Digit),
620            FieldDef::new("ratio", WpDataType::Float),
621            FieldDef::new("active", WpDataType::Bool),
622        ];
623        let records = vec![
624            make_record(vec![
625                Field::from_chars("name", "Alice"),
626                Field::from_digit("count", 10),
627                Field::from_float("ratio", 1.5),
628                Field::from_bool("active", true),
629            ]),
630            make_record(vec![
631                Field::from_chars("name", "Bob"),
632                Field::from_digit("count", 20),
633                Field::from_float("ratio", 2.5),
634                Field::from_bool("active", false),
635            ]),
636        ];
637
638        let batch = records_to_batch(&records, &fds).unwrap();
639        assert_eq!(batch.num_columns(), 4);
640        assert_eq!(batch.num_rows(), 2);
641
642        let names = batch.column(0).as_string::<i32>();
643        assert_eq!(names.value(0), "Alice");
644        assert_eq!(names.value(1), "Bob");
645
646        let counts = batch
647            .column(1)
648            .as_primitive::<arrow::datatypes::Int64Type>();
649        assert_eq!(counts.value(0), 10);
650        assert_eq!(counts.value(1), 20);
651
652        let ratios = batch
653            .column(2)
654            .as_primitive::<arrow::datatypes::Float64Type>();
655        assert!((ratios.value(0) - 1.5).abs() < f64::EPSILON);
656
657        let actives = batch.column(3).as_boolean();
658        assert!(actives.value(0));
659        assert!(!actives.value(1));
660    }
661
662    #[test]
663    fn r2b_time_field() {
664        let fds = vec![FieldDef::new("ts", WpDataType::Time)];
665        let ndt =
666            NaiveDateTime::parse_from_str("2024-06-15 12:30:00", "%Y-%m-%d %H:%M:%S").unwrap();
667        let records = vec![make_record(vec![Field::from_time("ts", ndt)])];
668
669        let batch = records_to_batch(&records, &fds).unwrap();
670        let arr = batch
671            .column(0)
672            .as_any()
673            .downcast_ref::<TimestampNanosecondArray>()
674            .unwrap();
675        let expected_nanos = ndt.and_utc().timestamp_nanos_opt().unwrap();
676        assert_eq!(arr.value(0), expected_nanos);
677    }
678
679    #[test]
680    fn r2b_ip_field() {
681        let fds = vec![FieldDef::new("addr", WpDataType::Ip)];
682        let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
683        let net = IpNetValue::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 8).unwrap();
684        let records = vec![
685            make_record(vec![Field::from_ip("addr", ip)]),
686            make_record(vec![Field::new(DataType::IP, "addr", Value::IpNet(net))]),
687        ];
688
689        let batch = records_to_batch(&records, &fds).unwrap();
690        let arr = batch.column(0).as_string::<i32>();
691        assert_eq!(arr.value(0), "192.168.1.1");
692        assert_eq!(arr.value(1), "10.0.0.0/8");
693    }
694
695    #[test]
696    fn r2b_hex_field() {
697        let fds = vec![FieldDef::new("color", WpDataType::Hex)];
698        let records = vec![make_record(vec![Field::from_hex("color", HexT(255))])];
699
700        let batch = records_to_batch(&records, &fds).unwrap();
701        let arr = batch.column(0).as_string::<i32>();
702        assert_eq!(arr.value(0), "0xFF");
703    }
704
705    #[test]
706    fn r2b_nullable_missing() {
707        let fds = vec![
708            FieldDef::new("name", WpDataType::Chars),
709            FieldDef::new("opt", WpDataType::Digit), // nullable by default
710        ];
711        let records = vec![
712            make_record(vec![Field::from_chars("name", "Alice")]),
713            // "opt" missing => should be null
714        ];
715
716        let batch = records_to_batch(&records, &fds).unwrap();
717        assert!(batch.column(1).is_null(0));
718    }
719
720    #[test]
721    fn r2b_required_missing() {
722        let fds = vec![FieldDef::new("required", WpDataType::Digit).with_nullable(false)];
723        let records = vec![make_record(vec![Field::from_chars("other", "x")])];
724
725        let err = records_to_batch(&records, &fds).unwrap_err();
726        assert!(matches!(err, WpArrowError::MissingRequiredField { .. }));
727    }
728
729    #[test]
730    fn r2b_null_value_nullable() {
731        let fds = vec![FieldDef::new("val", WpDataType::Chars)];
732        let records = vec![make_record(vec![Field::new(
733            DataType::Chars,
734            "val",
735            Value::Null,
736        )])];
737
738        let batch = records_to_batch(&records, &fds).unwrap();
739        assert!(batch.column(0).is_null(0));
740    }
741
742    #[test]
743    fn r2b_empty_records() {
744        let fds = vec![FieldDef::new("x", WpDataType::Digit)];
745        let records: Vec<DataRecord> = vec![];
746
747        let batch = records_to_batch(&records, &fds).unwrap();
748        assert_eq!(batch.num_rows(), 0);
749        assert_eq!(batch.num_columns(), 1);
750    }
751
752    #[test]
753    fn r2b_extra_fields_ignored() {
754        let fds = vec![FieldDef::new("a", WpDataType::Digit)];
755        let records = vec![make_record(vec![
756            Field::from_digit("a", 1),
757            Field::from_chars("extra", "ignored"),
758        ])];
759
760        let batch = records_to_batch(&records, &fds).unwrap();
761        assert_eq!(batch.num_columns(), 1);
762        let arr = batch
763            .column(0)
764            .as_primitive::<arrow::datatypes::Int64Type>();
765        assert_eq!(arr.value(0), 1);
766    }
767
768    #[test]
769    fn r2b_array_field() {
770        let fds = vec![FieldDef::new(
771            "tags",
772            WpDataType::Array(Box::new(WpDataType::Digit)),
773        )];
774        let items: Vec<DataField> =
775            vec![Field::from_digit("item", 10), Field::from_digit("item", 20)];
776        let records = vec![make_record(vec![Field::from_arr("tags", items)])];
777
778        let batch = records_to_batch(&records, &fds).unwrap();
779        let arr = batch
780            .column(0)
781            .as_any()
782            .downcast_ref::<ListArray>()
783            .unwrap();
784        assert_eq!(arr.len(), 1);
785        let inner = arr.value(0);
786        let inner_vals = inner.as_any().downcast_ref::<Int64Array>().unwrap();
787        assert_eq!(inner_vals.value(0), 10);
788        assert_eq!(inner_vals.value(1), 20);
789    }
790
791    #[test]
792    fn r2b_type_mismatch() {
793        let fds = vec![FieldDef::new("num", WpDataType::Digit)];
794        let records = vec![make_record(vec![Field::from_chars("num", "not_a_number")])];
795
796        let err = records_to_batch(&records, &fds).unwrap_err();
797        assert!(matches!(err, WpArrowError::ValueConversionError { .. }));
798    }
799
800    #[test]
801    fn r2b_large_batch() {
802        let fds = vec![
803            FieldDef::new("id", WpDataType::Digit),
804            FieldDef::new("name", WpDataType::Chars),
805        ];
806        let records: Vec<DataRecord> = (0..10000)
807            .map(|i| {
808                make_record(vec![
809                    Field::from_digit("id", i),
810                    Field::from_chars("name", format!("row_{i}")),
811                ])
812            })
813            .collect();
814
815        let batch = records_to_batch(&records, &fds).unwrap();
816        assert_eq!(batch.num_rows(), 10000);
817
818        let ids = batch
819            .column(0)
820            .as_primitive::<arrow::datatypes::Int64Type>();
821        assert_eq!(ids.value(0), 0);
822        assert_eq!(ids.value(9999), 9999);
823    }
824
825    // =======================================================================
826    // batch_to_records tests
827    // =======================================================================
828
829    #[test]
830    fn b2r_basic_types() {
831        let fds = vec![
832            FieldDef::new("name", WpDataType::Chars),
833            FieldDef::new("count", WpDataType::Digit),
834            FieldDef::new("ratio", WpDataType::Float),
835            FieldDef::new("active", WpDataType::Bool),
836        ];
837        // Build batch from records first
838        let records_in = vec![make_record(vec![
839            Field::from_chars("name", "Alice"),
840            Field::from_digit("count", 42),
841            Field::from_float("ratio", 1.23),
842            Field::from_bool("active", true),
843        ])];
844        let batch = records_to_batch(&records_in, &fds).unwrap();
845        let records_out = batch_to_records(&batch, &fds).unwrap();
846
847        assert_eq!(records_out.len(), 1);
848        let rec = &records_out[0];
849        assert_eq!(
850            rec.get_value("name"),
851            Some(&Value::Chars(FValueStr::from("Alice")))
852        );
853        assert_eq!(rec.get_value("count"), Some(&Value::Digit(42)));
854        assert_eq!(rec.get_value("ratio"), Some(&Value::Float(1.23)));
855        assert_eq!(rec.get_value("active"), Some(&Value::Bool(true)));
856    }
857
858    #[test]
859    fn b2r_timestamp() {
860        let fds = vec![FieldDef::new("ts", WpDataType::Time)];
861        let ndt =
862            NaiveDateTime::parse_from_str("2024-06-15 12:30:00", "%Y-%m-%d %H:%M:%S").unwrap();
863        let records_in = vec![make_record(vec![Field::from_time("ts", ndt)])];
864        let batch = records_to_batch(&records_in, &fds).unwrap();
865        let records_out = batch_to_records(&batch, &fds).unwrap();
866
867        assert_eq!(records_out[0].get_value("ts"), Some(&Value::Time(ndt)));
868    }
869
870    #[test]
871    fn b2r_ip_parsing() {
872        let fds = vec![FieldDef::new("addr", WpDataType::Ip)];
873        let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
874        let net = IpNetValue::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 8).unwrap();
875        let records_in = vec![
876            make_record(vec![Field::from_ip("addr", ip)]),
877            make_record(vec![Field::new(
878                DataType::IP,
879                "addr",
880                Value::IpNet(net.clone()),
881            )]),
882        ];
883        let batch = records_to_batch(&records_in, &fds).unwrap();
884        let records_out = batch_to_records(&batch, &fds).unwrap();
885
886        assert_eq!(records_out[0].get_value("addr"), Some(&Value::IpAddr(ip)));
887        assert_eq!(records_out[1].get_value("addr"), Some(&Value::IpNet(net)));
888    }
889
890    #[test]
891    fn b2r_hex_parsing() {
892        let fds = vec![FieldDef::new("color", WpDataType::Hex)];
893        let records_in = vec![make_record(vec![Field::from_hex("color", HexT(255))])];
894        let batch = records_to_batch(&records_in, &fds).unwrap();
895        let records_out = batch_to_records(&batch, &fds).unwrap();
896
897        assert_eq!(
898            records_out[0].get_value("color"),
899            Some(&Value::Hex(HexT(255)))
900        );
901    }
902
903    #[test]
904    fn b2r_sequential_ids() {
905        let fds = vec![FieldDef::new("x", WpDataType::Digit)];
906        let records_in = vec![
907            make_record(vec![Field::from_digit("x", 1)]),
908            make_record(vec![Field::from_digit("x", 2)]),
909            make_record(vec![Field::from_digit("x", 3)]),
910        ];
911        let batch = records_to_batch(&records_in, &fds).unwrap();
912        let records_out = batch_to_records(&batch, &fds).unwrap();
913
914        assert_eq!(records_out[0].id, 0);
915        assert_eq!(records_out[1].id, 1);
916        assert_eq!(records_out[2].id, 2);
917    }
918
919    #[test]
920    fn b2r_schema_mismatch() {
921        let fds_2 = vec![
922            FieldDef::new("a", WpDataType::Digit),
923            FieldDef::new("b", WpDataType::Digit),
924        ];
925        let fds_1 = vec![FieldDef::new("a", WpDataType::Digit)];
926        let records = vec![make_record(vec![Field::from_digit("a", 1)])];
927        let batch = records_to_batch(&records, &fds_1).unwrap();
928
929        let err = batch_to_records(&batch, &fds_2).unwrap_err();
930        assert!(matches!(
931            err,
932            WpArrowError::SchemaMismatch {
933                expected: 2,
934                actual: 1
935            }
936        ));
937    }
938
939    // =======================================================================
940    // Roundtrip tests
941    // =======================================================================
942
943    #[test]
944    fn roundtrip_all_types() {
945        let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
946        let net = IpNetValue::new(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 0)), 12).unwrap();
947        let ndt =
948            NaiveDateTime::parse_from_str("2025-01-01 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap();
949
950        let fds = vec![
951            FieldDef::new("chars", WpDataType::Chars),
952            FieldDef::new("digit", WpDataType::Digit),
953            FieldDef::new("float", WpDataType::Float),
954            FieldDef::new("bool", WpDataType::Bool),
955            FieldDef::new("time", WpDataType::Time),
956            FieldDef::new("ip", WpDataType::Ip),
957            FieldDef::new("hex", WpDataType::Hex),
958            FieldDef::new("nums", WpDataType::Array(Box::new(WpDataType::Digit))),
959        ];
960
961        let arr_items: Vec<DataField> = vec![
962            Field::from_digit("item", 100),
963            Field::from_digit("item", 200),
964        ];
965
966        let records_in = vec![
967            make_record(vec![
968                Field::from_chars("chars", "hello"),
969                Field::from_digit("digit", 42),
970                Field::from_float("float", 9.876),
971                Field::from_bool("bool", true),
972                Field::from_time("time", ndt),
973                Field::from_ip("ip", ip),
974                Field::from_hex("hex", HexT(0xDEAD)),
975                Field::from_arr("nums", arr_items),
976            ]),
977            make_record(vec![
978                Field::from_chars("chars", "world"),
979                Field::from_digit("digit", -1),
980                Field::from_float("float", 0.0),
981                Field::from_bool("bool", false),
982                Field::from_time("time", ndt),
983                Field::new(DataType::IP, "ip", Value::IpNet(net.clone())),
984                Field::from_hex("hex", HexT(0)),
985                Field::from_arr("nums", vec![Field::from_digit("item", 300)]),
986            ]),
987        ];
988
989        let batch = records_to_batch(&records_in, &fds).unwrap();
990        let records_out = batch_to_records(&batch, &fds).unwrap();
991
992        assert_eq!(records_out.len(), 2);
993
994        // Row 0
995        assert_eq!(
996            records_out[0].get_value("chars"),
997            Some(&Value::Chars(FValueStr::from("hello")))
998        );
999        assert_eq!(records_out[0].get_value("digit"), Some(&Value::Digit(42)));
1000        assert_eq!(
1001            records_out[0].get_value("float"),
1002            Some(&Value::Float(9.876))
1003        );
1004        assert_eq!(records_out[0].get_value("bool"), Some(&Value::Bool(true)));
1005        assert_eq!(records_out[0].get_value("time"), Some(&Value::Time(ndt)));
1006        assert_eq!(records_out[0].get_value("ip"), Some(&Value::IpAddr(ip)));
1007        assert_eq!(
1008            records_out[0].get_value("hex"),
1009            Some(&Value::Hex(HexT(0xDEAD)))
1010        );
1011
1012        // Verify array field
1013        if let Some(Value::Array(items)) = records_out[0].get_value("nums") {
1014            assert_eq!(items.len(), 2);
1015            assert_eq!(items[0].get_value(), &Value::Digit(100));
1016            assert_eq!(items[1].get_value(), &Value::Digit(200));
1017        } else {
1018            panic!("expected Array value for 'nums'");
1019        }
1020
1021        // Row 1
1022        assert_eq!(records_out[1].get_value("ip"), Some(&Value::IpNet(net)));
1023        assert_eq!(records_out[1].get_value("hex"), Some(&Value::Hex(HexT(0))));
1024    }
1025
1026    #[test]
1027    fn roundtrip_with_nulls() {
1028        let fds = vec![
1029            FieldDef::new("name", WpDataType::Chars),
1030            FieldDef::new("opt_digit", WpDataType::Digit),
1031        ];
1032
1033        let records_in = vec![
1034            make_record(vec![
1035                Field::from_chars("name", "row1"),
1036                Field::from_digit("opt_digit", 100),
1037            ]),
1038            make_record(vec![
1039                Field::from_chars("name", "row2"),
1040                // opt_digit missing => null
1041            ]),
1042        ];
1043
1044        let batch = records_to_batch(&records_in, &fds).unwrap();
1045        let records_out = batch_to_records(&batch, &fds).unwrap();
1046
1047        assert_eq!(records_out.len(), 2);
1048        assert_eq!(
1049            records_out[0].get_value("opt_digit"),
1050            Some(&Value::Digit(100))
1051        );
1052        // null field should be absent from the record (we skip nulls in batch_to_records)
1053        assert_eq!(records_out[1].get_value("opt_digit"), None);
1054    }
1055}