1use std::net::IpAddr;
2use std::sync::Arc;
3
4use arrow::array::{
5 Array, BooleanArray, Float64Array, Int64Array, ListArray, StringArray, TimestampNanosecondArray,
6};
7use arrow::array::{
8 ArrayRef, BooleanBuilder, Float64Builder, Int64Builder, ListBuilder, RecordBatch,
9 StringBuilder, TimestampNanosecondBuilder,
10};
11use chrono::DateTime;
12
13use wp_model_core::model::{
14 DataRecord, DataType, FValueStr, Field, FieldStorage, HexT, IpNetValue, Value,
15};
16
17use crate::error::WpArrowError;
18use crate::schema::{FieldDef, WpDataType, to_arrow_schema};
19
20pub fn records_to_batch(
26 records: &[DataRecord],
27 field_defs: &[FieldDef],
28) -> Result<RecordBatch, WpArrowError> {
29 let schema = to_arrow_schema(field_defs)?;
30 let columns: Vec<ArrayRef> = field_defs
31 .iter()
32 .map(|fd| build_column(fd, records))
33 .collect::<Result<_, _>>()?;
34 RecordBatch::try_new(Arc::new(schema), columns)
35 .map_err(|e| WpArrowError::ArrowBuildError(e.to_string()))
36}
37
38pub fn batch_to_records(
43 batch: &RecordBatch,
44 field_defs: &[FieldDef],
45) -> Result<Vec<DataRecord>, WpArrowError> {
46 if field_defs.len() != batch.num_columns() {
47 return Err(WpArrowError::SchemaMismatch {
48 expected: field_defs.len(),
49 actual: batch.num_columns(),
50 });
51 }
52
53 let num_rows = batch.num_rows();
54 let mut records = Vec::with_capacity(num_rows);
55
56 for row_idx in 0..num_rows {
57 let mut items = Vec::with_capacity(field_defs.len());
58 for (col_idx, fd) in field_defs.iter().enumerate() {
59 let col = batch.column(col_idx);
60 if col.is_null(row_idx) {
61 continue;
62 }
63 let value = extract_value(col, row_idx, &fd.data_type, &fd.name)?;
64 let meta = wp_type_to_model_meta(&fd.data_type);
65 let field = Field::new(meta, fd.name.as_str(), value);
66 items.push(FieldStorage::from_owned(field));
67 }
68 let mut record = DataRecord::from(items);
69 record.id = row_idx as u64;
70 records.push(record);
71 }
72
73 Ok(records)
74}
75
76fn build_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
81 match &fd.data_type {
82 WpDataType::Chars | WpDataType::Ip | WpDataType::Hex => build_string_column(fd, records),
83 WpDataType::Digit => build_digit_column(fd, records),
84 WpDataType::Float => build_float_column(fd, records),
85 WpDataType::Bool => build_bool_column(fd, records),
86 WpDataType::Time => build_time_column(fd, records),
87 WpDataType::Array(inner) => build_list_column(fd, records, inner),
88 }
89}
90
91fn build_string_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
92 let mut builder = StringBuilder::with_capacity(records.len(), records.len() * 32);
93 for rec in records {
94 match rec.get_value(&fd.name) {
95 Some(Value::Null) | None => {
96 handle_null(&mut builder, fd, |b| b.append_null())?;
97 }
98 Some(val) => {
99 let s = value_to_string(val, &fd.data_type, &fd.name)?;
100 builder.append_value(&s);
101 }
102 }
103 }
104 Ok(Arc::new(builder.finish()))
105}
106
107fn build_digit_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
108 let mut builder = Int64Builder::with_capacity(records.len());
109 for rec in records {
110 match rec.get_value(&fd.name) {
111 Some(Value::Null) | None => {
112 handle_null(&mut builder, fd, |b| b.append_null())?;
113 }
114 Some(Value::Digit(v)) => builder.append_value(*v),
115 Some(other) => {
116 return Err(WpArrowError::ValueConversionError {
117 field_name: fd.name.clone(),
118 expected: "Digit".to_string(),
119 actual: other.tag().to_string(),
120 });
121 }
122 }
123 }
124 Ok(Arc::new(builder.finish()))
125}
126
127fn build_float_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
128 let mut builder = Float64Builder::with_capacity(records.len());
129 for rec in records {
130 match rec.get_value(&fd.name) {
131 Some(Value::Null) | None => {
132 handle_null(&mut builder, fd, |b| b.append_null())?;
133 }
134 Some(Value::Float(v)) => builder.append_value(*v),
135 Some(other) => {
136 return Err(WpArrowError::ValueConversionError {
137 field_name: fd.name.clone(),
138 expected: "Float".to_string(),
139 actual: other.tag().to_string(),
140 });
141 }
142 }
143 }
144 Ok(Arc::new(builder.finish()))
145}
146
147fn build_bool_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
148 let mut builder = BooleanBuilder::with_capacity(records.len());
149 for rec in records {
150 match rec.get_value(&fd.name) {
151 Some(Value::Null) | None => {
152 handle_null(&mut builder, fd, |b| b.append_null())?;
153 }
154 Some(Value::Bool(v)) => builder.append_value(*v),
155 Some(other) => {
156 return Err(WpArrowError::ValueConversionError {
157 field_name: fd.name.clone(),
158 expected: "Bool".to_string(),
159 actual: other.tag().to_string(),
160 });
161 }
162 }
163 }
164 Ok(Arc::new(builder.finish()))
165}
166
167fn build_time_column(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
168 let mut builder = TimestampNanosecondBuilder::with_capacity(records.len());
169 for rec in records {
170 match rec.get_value(&fd.name) {
171 Some(Value::Null) | None => {
172 handle_null(&mut builder, fd, |b| b.append_null())?;
173 }
174 Some(Value::Time(ndt)) => {
175 let nanos = ndt.and_utc().timestamp_nanos_opt().ok_or_else(|| {
176 WpArrowError::TimestampOverflow {
177 field_name: fd.name.clone(),
178 }
179 })?;
180 builder.append_value(nanos);
181 }
182 Some(other) => {
183 return Err(WpArrowError::ValueConversionError {
184 field_name: fd.name.clone(),
185 expected: "Time".to_string(),
186 actual: other.tag().to_string(),
187 });
188 }
189 }
190 }
191 Ok(Arc::new(builder.finish()))
192}
193
194fn build_list_column(
195 fd: &FieldDef,
196 records: &[DataRecord],
197 inner_type: &WpDataType,
198) -> Result<ArrayRef, WpArrowError> {
199 match inner_type {
200 WpDataType::Chars | WpDataType::Ip | WpDataType::Hex => {
201 build_list_string(fd, records, inner_type)
202 }
203 WpDataType::Digit => build_list_digit(fd, records),
204 WpDataType::Float => build_list_float(fd, records),
205 WpDataType::Bool => build_list_bool(fd, records),
206 WpDataType::Time => build_list_time(fd, records),
207 WpDataType::Array(_) => Err(WpArrowError::UnsupportedDataType(
208 "nested array<array<...>> not supported".to_string(),
209 )),
210 }
211}
212
213fn build_list_string(
214 fd: &FieldDef,
215 records: &[DataRecord],
216 inner_type: &WpDataType,
217) -> Result<ArrayRef, WpArrowError> {
218 let mut builder = ListBuilder::new(StringBuilder::new());
219 for rec in records {
220 match rec.get_value(&fd.name) {
221 Some(Value::Null) | None => {
222 handle_null(&mut builder, fd, |b| b.append_null())?;
223 }
224 Some(Value::Array(items)) => {
225 for item in items {
226 let val = item.get_value();
227 if matches!(val, Value::Null) {
228 builder.values().append_null();
229 } else {
230 let s = value_to_string(val, inner_type, &fd.name)?;
231 builder.values().append_value(&s);
232 }
233 }
234 builder.append(true);
235 }
236 Some(other) => {
237 return Err(WpArrowError::ValueConversionError {
238 field_name: fd.name.clone(),
239 expected: "Array".to_string(),
240 actual: other.tag().to_string(),
241 });
242 }
243 }
244 }
245 Ok(Arc::new(builder.finish()))
246}
247
248fn build_list_digit(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
249 let mut builder = ListBuilder::new(Int64Builder::new());
250 for rec in records {
251 match rec.get_value(&fd.name) {
252 Some(Value::Null) | None => {
253 handle_null(&mut builder, fd, |b| b.append_null())?;
254 }
255 Some(Value::Array(items)) => {
256 for item in items {
257 match item.get_value() {
258 Value::Digit(v) => builder.values().append_value(*v),
259 Value::Null => builder.values().append_null(),
260 other => {
261 return Err(WpArrowError::ValueConversionError {
262 field_name: fd.name.clone(),
263 expected: "Digit".to_string(),
264 actual: other.tag().to_string(),
265 });
266 }
267 }
268 }
269 builder.append(true);
270 }
271 Some(other) => {
272 return Err(WpArrowError::ValueConversionError {
273 field_name: fd.name.clone(),
274 expected: "Array".to_string(),
275 actual: other.tag().to_string(),
276 });
277 }
278 }
279 }
280 Ok(Arc::new(builder.finish()))
281}
282
283fn build_list_float(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
284 let mut builder = ListBuilder::new(Float64Builder::new());
285 for rec in records {
286 match rec.get_value(&fd.name) {
287 Some(Value::Null) | None => {
288 handle_null(&mut builder, fd, |b| b.append_null())?;
289 }
290 Some(Value::Array(items)) => {
291 for item in items {
292 match item.get_value() {
293 Value::Float(v) => builder.values().append_value(*v),
294 Value::Null => builder.values().append_null(),
295 other => {
296 return Err(WpArrowError::ValueConversionError {
297 field_name: fd.name.clone(),
298 expected: "Float".to_string(),
299 actual: other.tag().to_string(),
300 });
301 }
302 }
303 }
304 builder.append(true);
305 }
306 Some(other) => {
307 return Err(WpArrowError::ValueConversionError {
308 field_name: fd.name.clone(),
309 expected: "Array".to_string(),
310 actual: other.tag().to_string(),
311 });
312 }
313 }
314 }
315 Ok(Arc::new(builder.finish()))
316}
317
318fn build_list_bool(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
319 let mut builder = ListBuilder::new(BooleanBuilder::new());
320 for rec in records {
321 match rec.get_value(&fd.name) {
322 Some(Value::Null) | None => {
323 handle_null(&mut builder, fd, |b| b.append_null())?;
324 }
325 Some(Value::Array(items)) => {
326 for item in items {
327 match item.get_value() {
328 Value::Bool(v) => builder.values().append_value(*v),
329 Value::Null => builder.values().append_null(),
330 other => {
331 return Err(WpArrowError::ValueConversionError {
332 field_name: fd.name.clone(),
333 expected: "Bool".to_string(),
334 actual: other.tag().to_string(),
335 });
336 }
337 }
338 }
339 builder.append(true);
340 }
341 Some(other) => {
342 return Err(WpArrowError::ValueConversionError {
343 field_name: fd.name.clone(),
344 expected: "Array".to_string(),
345 actual: other.tag().to_string(),
346 });
347 }
348 }
349 }
350 Ok(Arc::new(builder.finish()))
351}
352
353fn build_list_time(fd: &FieldDef, records: &[DataRecord]) -> Result<ArrayRef, WpArrowError> {
354 let mut builder = ListBuilder::new(TimestampNanosecondBuilder::new());
355 for rec in records {
356 match rec.get_value(&fd.name) {
357 Some(Value::Null) | None => {
358 handle_null(&mut builder, fd, |b| b.append_null())?;
359 }
360 Some(Value::Array(items)) => {
361 for item in items {
362 match item.get_value() {
363 Value::Time(ndt) => {
364 let nanos = ndt.and_utc().timestamp_nanos_opt().ok_or_else(|| {
365 WpArrowError::TimestampOverflow {
366 field_name: fd.name.clone(),
367 }
368 })?;
369 builder.values().append_value(nanos);
370 }
371 Value::Null => builder.values().append_null(),
372 other => {
373 return Err(WpArrowError::ValueConversionError {
374 field_name: fd.name.clone(),
375 expected: "Time".to_string(),
376 actual: other.tag().to_string(),
377 });
378 }
379 }
380 }
381 builder.append(true);
382 }
383 Some(other) => {
384 return Err(WpArrowError::ValueConversionError {
385 field_name: fd.name.clone(),
386 expected: "Array".to_string(),
387 actual: other.tag().to_string(),
388 });
389 }
390 }
391 }
392 Ok(Arc::new(builder.finish()))
393}
394
395fn value_to_string(
397 val: &Value,
398 wp_type: &WpDataType,
399 field_name: &str,
400) -> Result<String, WpArrowError> {
401 match (wp_type, val) {
402 (WpDataType::Chars, Value::Chars(s)) => Ok(s.to_string()),
404 (WpDataType::Chars, Value::Domain(d)) => Ok(d.to_string()),
405 (WpDataType::Chars, Value::Url(u)) => Ok(u.to_string()),
406 (WpDataType::Chars, Value::Email(e)) => Ok(e.to_string()),
407 (WpDataType::Ip, Value::IpAddr(ip)) => Ok(ip.to_string()),
409 (WpDataType::Ip, Value::IpNet(net)) => Ok(net.to_string()),
410 (WpDataType::Ip, Value::Chars(s)) => Ok(s.to_string()),
411 (WpDataType::Hex, Value::Hex(h)) => Ok(format!("{:#X}", h.0)),
413 _ => Err(WpArrowError::ValueConversionError {
414 field_name: field_name.to_string(),
415 expected: format!("{:?}", wp_type),
416 actual: val.tag().to_string(),
417 }),
418 }
419}
420
421fn handle_null<B, F>(builder: &mut B, fd: &FieldDef, append_null: F) -> Result<(), WpArrowError>
423where
424 F: FnOnce(&mut B),
425{
426 if fd.nullable {
427 append_null(builder);
428 Ok(())
429 } else {
430 Err(WpArrowError::MissingRequiredField {
431 field_name: fd.name.clone(),
432 })
433 }
434}
435
436fn extract_value(
442 col: &ArrayRef,
443 row_idx: usize,
444 wp_type: &WpDataType,
445 field_name: &str,
446) -> Result<Value, WpArrowError> {
447 match wp_type {
448 WpDataType::Chars => {
449 let arr = col
450 .as_any()
451 .downcast_ref::<StringArray>()
452 .ok_or_else(|| WpArrowError::ArrowBuildError("expected StringArray".to_string()))?;
453 Ok(Value::Chars(FValueStr::from(arr.value(row_idx))))
454 }
455 WpDataType::Digit => {
456 let arr = col
457 .as_any()
458 .downcast_ref::<Int64Array>()
459 .ok_or_else(|| WpArrowError::ArrowBuildError("expected Int64Array".to_string()))?;
460 Ok(Value::Digit(arr.value(row_idx)))
461 }
462 WpDataType::Float => {
463 let arr = col.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
464 WpArrowError::ArrowBuildError("expected Float64Array".to_string())
465 })?;
466 Ok(Value::Float(arr.value(row_idx)))
467 }
468 WpDataType::Bool => {
469 let arr = col.as_any().downcast_ref::<BooleanArray>().ok_or_else(|| {
470 WpArrowError::ArrowBuildError("expected BooleanArray".to_string())
471 })?;
472 Ok(Value::Bool(arr.value(row_idx)))
473 }
474 WpDataType::Time => {
475 let arr = col
476 .as_any()
477 .downcast_ref::<TimestampNanosecondArray>()
478 .ok_or_else(|| {
479 WpArrowError::ArrowBuildError("expected TimestampNanosecondArray".to_string())
480 })?;
481 let nanos = arr.value(row_idx);
482 let ndt = DateTime::from_timestamp_nanos(nanos).naive_utc();
483 Ok(Value::Time(ndt))
484 }
485 WpDataType::Ip => {
486 let arr = col
487 .as_any()
488 .downcast_ref::<StringArray>()
489 .ok_or_else(|| WpArrowError::ArrowBuildError("expected StringArray".to_string()))?;
490 let s = arr.value(row_idx);
491 Ok(parse_ip_value(s, field_name)?)
492 }
493 WpDataType::Hex => {
494 let arr = col
495 .as_any()
496 .downcast_ref::<StringArray>()
497 .ok_or_else(|| WpArrowError::ArrowBuildError("expected StringArray".to_string()))?;
498 let s = arr.value(row_idx);
499 Ok(parse_hex_value(s, field_name)?)
500 }
501 WpDataType::Array(inner) => {
502 let arr = col
503 .as_any()
504 .downcast_ref::<ListArray>()
505 .ok_or_else(|| WpArrowError::ArrowBuildError("expected ListArray".to_string()))?;
506 let inner_arr = arr.value(row_idx);
507 let inner_meta = wp_type_to_model_meta(inner);
508 let mut items = Vec::new();
509 for i in 0..inner_arr.len() {
510 if inner_arr.is_null(i) {
511 items.push(FieldStorage::from_owned(Field::new(
512 inner_meta.clone(),
513 "item",
514 Value::Null,
515 )));
516 } else {
517 let val = extract_value(&inner_arr, i, inner, field_name)?;
518 items.push(FieldStorage::from_owned(Field::new(
519 inner_meta.clone(),
520 "item",
521 val,
522 )));
523 }
524 }
525 Ok(Value::Array(items))
526 }
527 }
528}
529
530fn parse_ip_value(s: &str, field_name: &str) -> Result<Value, WpArrowError> {
532 if s.contains('/') {
533 let parts: Vec<&str> = s.splitn(2, '/').collect();
535 let addr: IpAddr = parts[0].parse().map_err(|e| WpArrowError::ParseError {
536 field_name: field_name.to_string(),
537 detail: format!("invalid IP address: {e}"),
538 })?;
539 let prefix: u8 = parts[1].parse().map_err(|e| WpArrowError::ParseError {
540 field_name: field_name.to_string(),
541 detail: format!("invalid prefix length: {e}"),
542 })?;
543 let net = IpNetValue::new(addr, prefix).ok_or_else(|| WpArrowError::ParseError {
544 field_name: field_name.to_string(),
545 detail: format!("invalid prefix length {prefix} for {addr}"),
546 })?;
547 Ok(Value::IpNet(net))
548 } else {
549 let addr: IpAddr = s.parse().map_err(|e| WpArrowError::ParseError {
551 field_name: field_name.to_string(),
552 detail: format!("invalid IP address: {e}"),
553 })?;
554 Ok(Value::IpAddr(addr))
555 }
556}
557
558fn parse_hex_value(s: &str, field_name: &str) -> Result<Value, WpArrowError> {
560 let hex_str = s
561 .strip_prefix("0x")
562 .or_else(|| s.strip_prefix("0X"))
563 .unwrap_or(s);
564 let v = u128::from_str_radix(hex_str, 16).map_err(|e| WpArrowError::ParseError {
565 field_name: field_name.to_string(),
566 detail: format!("invalid hex: {e}"),
567 })?;
568 Ok(Value::Hex(HexT(v)))
569}
570
571fn wp_type_to_model_meta(wp_type: &WpDataType) -> DataType {
573 match wp_type {
574 WpDataType::Chars => DataType::Chars,
575 WpDataType::Digit => DataType::Digit,
576 WpDataType::Float => DataType::Float,
577 WpDataType::Bool => DataType::Bool,
578 WpDataType::Time => DataType::Time,
579 WpDataType::Ip => DataType::IP,
580 WpDataType::Hex => DataType::Hex,
581 WpDataType::Array(inner) => {
582 let inner_name = match inner.as_ref() {
583 WpDataType::Chars => "chars",
584 WpDataType::Digit => "digit",
585 WpDataType::Float => "float",
586 WpDataType::Bool => "bool",
587 WpDataType::Time => "time",
588 WpDataType::Ip => "ip",
589 WpDataType::Hex => "hex",
590 WpDataType::Array(_) => "array",
591 };
592 DataType::Array(inner_name.to_string())
593 }
594 }
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600 use crate::schema::{FieldDef, WpDataType};
601 use arrow::array::AsArray;
602 use chrono::NaiveDateTime;
603 use std::net::{IpAddr, Ipv4Addr};
604 use wp_model_core::model::{DataField, DataRecord, Field, Value};
605
606 fn make_record(fields: Vec<DataField>) -> DataRecord {
608 DataRecord::from(fields)
609 }
610
611 #[test]
616 fn r2b_basic_types() {
617 let fds = vec![
618 FieldDef::new("name", WpDataType::Chars),
619 FieldDef::new("count", WpDataType::Digit),
620 FieldDef::new("ratio", WpDataType::Float),
621 FieldDef::new("active", WpDataType::Bool),
622 ];
623 let records = vec![
624 make_record(vec![
625 Field::from_chars("name", "Alice"),
626 Field::from_digit("count", 10),
627 Field::from_float("ratio", 1.5),
628 Field::from_bool("active", true),
629 ]),
630 make_record(vec![
631 Field::from_chars("name", "Bob"),
632 Field::from_digit("count", 20),
633 Field::from_float("ratio", 2.5),
634 Field::from_bool("active", false),
635 ]),
636 ];
637
638 let batch = records_to_batch(&records, &fds).unwrap();
639 assert_eq!(batch.num_columns(), 4);
640 assert_eq!(batch.num_rows(), 2);
641
642 let names = batch.column(0).as_string::<i32>();
643 assert_eq!(names.value(0), "Alice");
644 assert_eq!(names.value(1), "Bob");
645
646 let counts = batch
647 .column(1)
648 .as_primitive::<arrow::datatypes::Int64Type>();
649 assert_eq!(counts.value(0), 10);
650 assert_eq!(counts.value(1), 20);
651
652 let ratios = batch
653 .column(2)
654 .as_primitive::<arrow::datatypes::Float64Type>();
655 assert!((ratios.value(0) - 1.5).abs() < f64::EPSILON);
656
657 let actives = batch.column(3).as_boolean();
658 assert!(actives.value(0));
659 assert!(!actives.value(1));
660 }
661
662 #[test]
663 fn r2b_time_field() {
664 let fds = vec![FieldDef::new("ts", WpDataType::Time)];
665 let ndt =
666 NaiveDateTime::parse_from_str("2024-06-15 12:30:00", "%Y-%m-%d %H:%M:%S").unwrap();
667 let records = vec![make_record(vec![Field::from_time("ts", ndt)])];
668
669 let batch = records_to_batch(&records, &fds).unwrap();
670 let arr = batch
671 .column(0)
672 .as_any()
673 .downcast_ref::<TimestampNanosecondArray>()
674 .unwrap();
675 let expected_nanos = ndt.and_utc().timestamp_nanos_opt().unwrap();
676 assert_eq!(arr.value(0), expected_nanos);
677 }
678
679 #[test]
680 fn r2b_ip_field() {
681 let fds = vec![FieldDef::new("addr", WpDataType::Ip)];
682 let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
683 let net = IpNetValue::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 8).unwrap();
684 let records = vec![
685 make_record(vec![Field::from_ip("addr", ip)]),
686 make_record(vec![Field::new(DataType::IP, "addr", Value::IpNet(net))]),
687 ];
688
689 let batch = records_to_batch(&records, &fds).unwrap();
690 let arr = batch.column(0).as_string::<i32>();
691 assert_eq!(arr.value(0), "192.168.1.1");
692 assert_eq!(arr.value(1), "10.0.0.0/8");
693 }
694
695 #[test]
696 fn r2b_hex_field() {
697 let fds = vec![FieldDef::new("color", WpDataType::Hex)];
698 let records = vec![make_record(vec![Field::from_hex("color", HexT(255))])];
699
700 let batch = records_to_batch(&records, &fds).unwrap();
701 let arr = batch.column(0).as_string::<i32>();
702 assert_eq!(arr.value(0), "0xFF");
703 }
704
705 #[test]
706 fn r2b_nullable_missing() {
707 let fds = vec![
708 FieldDef::new("name", WpDataType::Chars),
709 FieldDef::new("opt", WpDataType::Digit), ];
711 let records = vec![
712 make_record(vec![Field::from_chars("name", "Alice")]),
713 ];
715
716 let batch = records_to_batch(&records, &fds).unwrap();
717 assert!(batch.column(1).is_null(0));
718 }
719
720 #[test]
721 fn r2b_required_missing() {
722 let fds = vec![FieldDef::new("required", WpDataType::Digit).with_nullable(false)];
723 let records = vec![make_record(vec![Field::from_chars("other", "x")])];
724
725 let err = records_to_batch(&records, &fds).unwrap_err();
726 assert!(matches!(err, WpArrowError::MissingRequiredField { .. }));
727 }
728
729 #[test]
730 fn r2b_null_value_nullable() {
731 let fds = vec![FieldDef::new("val", WpDataType::Chars)];
732 let records = vec![make_record(vec![Field::new(
733 DataType::Chars,
734 "val",
735 Value::Null,
736 )])];
737
738 let batch = records_to_batch(&records, &fds).unwrap();
739 assert!(batch.column(0).is_null(0));
740 }
741
742 #[test]
743 fn r2b_empty_records() {
744 let fds = vec![FieldDef::new("x", WpDataType::Digit)];
745 let records: Vec<DataRecord> = vec![];
746
747 let batch = records_to_batch(&records, &fds).unwrap();
748 assert_eq!(batch.num_rows(), 0);
749 assert_eq!(batch.num_columns(), 1);
750 }
751
752 #[test]
753 fn r2b_extra_fields_ignored() {
754 let fds = vec![FieldDef::new("a", WpDataType::Digit)];
755 let records = vec![make_record(vec![
756 Field::from_digit("a", 1),
757 Field::from_chars("extra", "ignored"),
758 ])];
759
760 let batch = records_to_batch(&records, &fds).unwrap();
761 assert_eq!(batch.num_columns(), 1);
762 let arr = batch
763 .column(0)
764 .as_primitive::<arrow::datatypes::Int64Type>();
765 assert_eq!(arr.value(0), 1);
766 }
767
768 #[test]
769 fn r2b_array_field() {
770 let fds = vec![FieldDef::new(
771 "tags",
772 WpDataType::Array(Box::new(WpDataType::Digit)),
773 )];
774 let items: Vec<DataField> =
775 vec![Field::from_digit("item", 10), Field::from_digit("item", 20)];
776 let records = vec![make_record(vec![Field::from_arr("tags", items)])];
777
778 let batch = records_to_batch(&records, &fds).unwrap();
779 let arr = batch
780 .column(0)
781 .as_any()
782 .downcast_ref::<ListArray>()
783 .unwrap();
784 assert_eq!(arr.len(), 1);
785 let inner = arr.value(0);
786 let inner_vals = inner.as_any().downcast_ref::<Int64Array>().unwrap();
787 assert_eq!(inner_vals.value(0), 10);
788 assert_eq!(inner_vals.value(1), 20);
789 }
790
791 #[test]
792 fn r2b_type_mismatch() {
793 let fds = vec![FieldDef::new("num", WpDataType::Digit)];
794 let records = vec![make_record(vec![Field::from_chars("num", "not_a_number")])];
795
796 let err = records_to_batch(&records, &fds).unwrap_err();
797 assert!(matches!(err, WpArrowError::ValueConversionError { .. }));
798 }
799
800 #[test]
801 fn r2b_large_batch() {
802 let fds = vec![
803 FieldDef::new("id", WpDataType::Digit),
804 FieldDef::new("name", WpDataType::Chars),
805 ];
806 let records: Vec<DataRecord> = (0..10000)
807 .map(|i| {
808 make_record(vec![
809 Field::from_digit("id", i),
810 Field::from_chars("name", format!("row_{i}")),
811 ])
812 })
813 .collect();
814
815 let batch = records_to_batch(&records, &fds).unwrap();
816 assert_eq!(batch.num_rows(), 10000);
817
818 let ids = batch
819 .column(0)
820 .as_primitive::<arrow::datatypes::Int64Type>();
821 assert_eq!(ids.value(0), 0);
822 assert_eq!(ids.value(9999), 9999);
823 }
824
825 #[test]
830 fn b2r_basic_types() {
831 let fds = vec![
832 FieldDef::new("name", WpDataType::Chars),
833 FieldDef::new("count", WpDataType::Digit),
834 FieldDef::new("ratio", WpDataType::Float),
835 FieldDef::new("active", WpDataType::Bool),
836 ];
837 let records_in = vec![make_record(vec![
839 Field::from_chars("name", "Alice"),
840 Field::from_digit("count", 42),
841 Field::from_float("ratio", 1.23),
842 Field::from_bool("active", true),
843 ])];
844 let batch = records_to_batch(&records_in, &fds).unwrap();
845 let records_out = batch_to_records(&batch, &fds).unwrap();
846
847 assert_eq!(records_out.len(), 1);
848 let rec = &records_out[0];
849 assert_eq!(
850 rec.get_value("name"),
851 Some(&Value::Chars(FValueStr::from("Alice")))
852 );
853 assert_eq!(rec.get_value("count"), Some(&Value::Digit(42)));
854 assert_eq!(rec.get_value("ratio"), Some(&Value::Float(1.23)));
855 assert_eq!(rec.get_value("active"), Some(&Value::Bool(true)));
856 }
857
858 #[test]
859 fn b2r_timestamp() {
860 let fds = vec![FieldDef::new("ts", WpDataType::Time)];
861 let ndt =
862 NaiveDateTime::parse_from_str("2024-06-15 12:30:00", "%Y-%m-%d %H:%M:%S").unwrap();
863 let records_in = vec![make_record(vec![Field::from_time("ts", ndt)])];
864 let batch = records_to_batch(&records_in, &fds).unwrap();
865 let records_out = batch_to_records(&batch, &fds).unwrap();
866
867 assert_eq!(records_out[0].get_value("ts"), Some(&Value::Time(ndt)));
868 }
869
870 #[test]
871 fn b2r_ip_parsing() {
872 let fds = vec![FieldDef::new("addr", WpDataType::Ip)];
873 let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
874 let net = IpNetValue::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 0)), 8).unwrap();
875 let records_in = vec![
876 make_record(vec![Field::from_ip("addr", ip)]),
877 make_record(vec![Field::new(
878 DataType::IP,
879 "addr",
880 Value::IpNet(net.clone()),
881 )]),
882 ];
883 let batch = records_to_batch(&records_in, &fds).unwrap();
884 let records_out = batch_to_records(&batch, &fds).unwrap();
885
886 assert_eq!(records_out[0].get_value("addr"), Some(&Value::IpAddr(ip)));
887 assert_eq!(records_out[1].get_value("addr"), Some(&Value::IpNet(net)));
888 }
889
890 #[test]
891 fn b2r_hex_parsing() {
892 let fds = vec![FieldDef::new("color", WpDataType::Hex)];
893 let records_in = vec![make_record(vec![Field::from_hex("color", HexT(255))])];
894 let batch = records_to_batch(&records_in, &fds).unwrap();
895 let records_out = batch_to_records(&batch, &fds).unwrap();
896
897 assert_eq!(
898 records_out[0].get_value("color"),
899 Some(&Value::Hex(HexT(255)))
900 );
901 }
902
903 #[test]
904 fn b2r_sequential_ids() {
905 let fds = vec![FieldDef::new("x", WpDataType::Digit)];
906 let records_in = vec![
907 make_record(vec![Field::from_digit("x", 1)]),
908 make_record(vec![Field::from_digit("x", 2)]),
909 make_record(vec![Field::from_digit("x", 3)]),
910 ];
911 let batch = records_to_batch(&records_in, &fds).unwrap();
912 let records_out = batch_to_records(&batch, &fds).unwrap();
913
914 assert_eq!(records_out[0].id, 0);
915 assert_eq!(records_out[1].id, 1);
916 assert_eq!(records_out[2].id, 2);
917 }
918
919 #[test]
920 fn b2r_schema_mismatch() {
921 let fds_2 = vec![
922 FieldDef::new("a", WpDataType::Digit),
923 FieldDef::new("b", WpDataType::Digit),
924 ];
925 let fds_1 = vec![FieldDef::new("a", WpDataType::Digit)];
926 let records = vec![make_record(vec![Field::from_digit("a", 1)])];
927 let batch = records_to_batch(&records, &fds_1).unwrap();
928
929 let err = batch_to_records(&batch, &fds_2).unwrap_err();
930 assert!(matches!(
931 err,
932 WpArrowError::SchemaMismatch {
933 expected: 2,
934 actual: 1
935 }
936 ));
937 }
938
939 #[test]
944 fn roundtrip_all_types() {
945 let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
946 let net = IpNetValue::new(IpAddr::V4(Ipv4Addr::new(172, 16, 0, 0)), 12).unwrap();
947 let ndt =
948 NaiveDateTime::parse_from_str("2025-01-01 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap();
949
950 let fds = vec![
951 FieldDef::new("chars", WpDataType::Chars),
952 FieldDef::new("digit", WpDataType::Digit),
953 FieldDef::new("float", WpDataType::Float),
954 FieldDef::new("bool", WpDataType::Bool),
955 FieldDef::new("time", WpDataType::Time),
956 FieldDef::new("ip", WpDataType::Ip),
957 FieldDef::new("hex", WpDataType::Hex),
958 FieldDef::new("nums", WpDataType::Array(Box::new(WpDataType::Digit))),
959 ];
960
961 let arr_items: Vec<DataField> = vec![
962 Field::from_digit("item", 100),
963 Field::from_digit("item", 200),
964 ];
965
966 let records_in = vec![
967 make_record(vec![
968 Field::from_chars("chars", "hello"),
969 Field::from_digit("digit", 42),
970 Field::from_float("float", 9.876),
971 Field::from_bool("bool", true),
972 Field::from_time("time", ndt),
973 Field::from_ip("ip", ip),
974 Field::from_hex("hex", HexT(0xDEAD)),
975 Field::from_arr("nums", arr_items),
976 ]),
977 make_record(vec![
978 Field::from_chars("chars", "world"),
979 Field::from_digit("digit", -1),
980 Field::from_float("float", 0.0),
981 Field::from_bool("bool", false),
982 Field::from_time("time", ndt),
983 Field::new(DataType::IP, "ip", Value::IpNet(net.clone())),
984 Field::from_hex("hex", HexT(0)),
985 Field::from_arr("nums", vec![Field::from_digit("item", 300)]),
986 ]),
987 ];
988
989 let batch = records_to_batch(&records_in, &fds).unwrap();
990 let records_out = batch_to_records(&batch, &fds).unwrap();
991
992 assert_eq!(records_out.len(), 2);
993
994 assert_eq!(
996 records_out[0].get_value("chars"),
997 Some(&Value::Chars(FValueStr::from("hello")))
998 );
999 assert_eq!(records_out[0].get_value("digit"), Some(&Value::Digit(42)));
1000 assert_eq!(
1001 records_out[0].get_value("float"),
1002 Some(&Value::Float(9.876))
1003 );
1004 assert_eq!(records_out[0].get_value("bool"), Some(&Value::Bool(true)));
1005 assert_eq!(records_out[0].get_value("time"), Some(&Value::Time(ndt)));
1006 assert_eq!(records_out[0].get_value("ip"), Some(&Value::IpAddr(ip)));
1007 assert_eq!(
1008 records_out[0].get_value("hex"),
1009 Some(&Value::Hex(HexT(0xDEAD)))
1010 );
1011
1012 if let Some(Value::Array(items)) = records_out[0].get_value("nums") {
1014 assert_eq!(items.len(), 2);
1015 assert_eq!(items[0].get_value(), &Value::Digit(100));
1016 assert_eq!(items[1].get_value(), &Value::Digit(200));
1017 } else {
1018 panic!("expected Array value for 'nums'");
1019 }
1020
1021 assert_eq!(records_out[1].get_value("ip"), Some(&Value::IpNet(net)));
1023 assert_eq!(records_out[1].get_value("hex"), Some(&Value::Hex(HexT(0))));
1024 }
1025
1026 #[test]
1027 fn roundtrip_with_nulls() {
1028 let fds = vec![
1029 FieldDef::new("name", WpDataType::Chars),
1030 FieldDef::new("opt_digit", WpDataType::Digit),
1031 ];
1032
1033 let records_in = vec![
1034 make_record(vec![
1035 Field::from_chars("name", "row1"),
1036 Field::from_digit("opt_digit", 100),
1037 ]),
1038 make_record(vec![
1039 Field::from_chars("name", "row2"),
1040 ]),
1042 ];
1043
1044 let batch = records_to_batch(&records_in, &fds).unwrap();
1045 let records_out = batch_to_records(&batch, &fds).unwrap();
1046
1047 assert_eq!(records_out.len(), 2);
1048 assert_eq!(
1049 records_out[0].get_value("opt_digit"),
1050 Some(&Value::Digit(100))
1051 );
1052 assert_eq!(records_out[1].get_value("opt_digit"), None);
1054 }
1055}