1use crate::{DataType, DriverError, DriverResult, NGValue, Transform};
2use bytes::Bytes;
3use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
4use std::sync::Arc;
5
6pub struct ValueCodec;
9
10impl ValueCodec {
11 const F64_EXACT_INT_MAX_U64: u64 = 9_007_199_254_740_992; #[inline]
19 pub fn logical_to_wire_value(
20 value: &NGValue,
21 logical_dt: DataType,
22 wire_dt: DataType,
23 t: &Transform,
24 ) -> DriverResult<NGValue> {
25 if !value.validate_datatype(logical_dt) {
26 return Err(DriverError::ValidationError(format!(
27 "type mismatch: expected logical {:?}, got {:?}",
28 logical_dt,
29 value.data_type()
30 )));
31 }
32
33 if !logical_dt.is_numeric() || !wire_dt.is_numeric() {
34 return Self::non_numeric_logical_to_wire(value, logical_dt, wire_dt, t);
35 }
36
37 if t.is_identity_numeric() {
39 return Self::numeric_identity_logical_to_wire(value, logical_dt, wire_dt);
40 }
41
42 let y = Self::numeric_value_to_f64_strict(value, logical_dt)?;
44 let x = Self::inverse_transform_f64_strict(y, t)?;
45 Self::box_f64_to_wire_strict(x, wire_dt)
46 }
47
48 #[inline]
61 pub fn wire_to_logical_value(
62 value: &NGValue,
63 wire_dt: DataType,
64 logical_dt: DataType,
65 t: &Transform,
66 ) -> DriverResult<NGValue> {
67 if !value.validate_datatype(wire_dt) {
68 return Err(DriverError::ValidationError(format!(
69 "type mismatch: expected wire {:?}, got {:?}",
70 wire_dt,
71 value.data_type()
72 )));
73 }
74
75 if !logical_dt.is_numeric() {
78 if !t.is_identity_numeric() {
79 return Err(DriverError::ConfigurationError(format!(
80 "non-numeric uplink cannot apply numeric transform: wire={:?}, logical={:?}, transform={:?}",
81 wire_dt, logical_dt, t
82 )));
83 }
84 if wire_dt != logical_dt {
85 return Err(DriverError::ValidationError(format!(
86 "non-numeric uplink does not support wire/logical datatype mapping: wire={:?}, logical={:?}",
87 wire_dt, logical_dt
88 )));
89 }
90 return Ok(value.clone());
91 }
92
93 if t.is_identity_numeric() && wire_dt == logical_dt {
95 return Ok(value.clone());
96 }
97
98 if !t.is_identity_numeric() {
101 match &value {
102 NGValue::UInt64(v) => {
103 if *v > Self::F64_EXACT_INT_MAX_U64 {
104 return Err(DriverError::ValidationError(format!(
105 "numeric uplink value too large for safe transform (UInt64 > 2^53): {v}"
106 )));
107 }
108 }
109 NGValue::Int64(v) => {
110 if v.unsigned_abs() > Self::F64_EXACT_INT_MAX_U64 {
111 return Err(DriverError::ValidationError(format!(
112 "numeric uplink value too large for safe transform (Int64 magnitude > 2^53): {v}"
113 )));
114 }
115 }
116 _ => {}
117 }
118 }
119
120 let out = match value {
121 NGValue::Boolean(b) => Self::coerce_bool_to_value(*b, logical_dt, t),
122 NGValue::Int8(v) => Self::coerce_i64_to_value(*v as i64, logical_dt, t),
123 NGValue::UInt8(v) => Self::coerce_u64_to_value(*v as u64, logical_dt, t),
124 NGValue::Int16(v) => Self::coerce_i64_to_value(*v as i64, logical_dt, t),
125 NGValue::UInt16(v) => Self::coerce_u64_to_value(*v as u64, logical_dt, t),
126 NGValue::Int32(v) => Self::coerce_i64_to_value(*v as i64, logical_dt, t),
127 NGValue::UInt32(v) => Self::coerce_u64_to_value(*v as u64, logical_dt, t),
128 NGValue::Int64(v) => Self::coerce_i64_to_value(*v, logical_dt, t),
129 NGValue::UInt64(v) => Self::coerce_u64_to_value(*v, logical_dt, t),
130 NGValue::Float32(v) => Self::coerce_f64_to_value(*v as f64, logical_dt, t),
131 NGValue::Float64(v) => Self::coerce_f64_to_value(*v, logical_dt, t),
132 NGValue::Timestamp(ms) => Self::coerce_i64_to_value(*ms, logical_dt, t),
134 NGValue::String(_) | NGValue::Binary(_) => f64::try_from(value)
140 .ok()
141 .and_then(|n| Self::coerce_f64_to_value(n, logical_dt, t)),
142 };
143
144 out.ok_or(
145 DriverError::ValidationError(format!(
146 "uplink wire->logical coercion failed: wire={wire_dt:?}, logical={logical_dt:?}, value={value:?}, transform={t:?}"
147 ))
148 )
149 }
150
151 #[inline]
152 fn non_numeric_logical_to_wire(
153 value: &NGValue,
154 logical_dt: DataType,
155 wire_dt: DataType,
156 t: &Transform,
157 ) -> DriverResult<NGValue> {
158 if !t.is_identity_numeric() {
159 return Err(DriverError::ConfigurationError(format!(
160 "non-numeric downlink cannot apply numeric transform: logical={:?}, wire={:?}, transform={:?}",
161 logical_dt, wire_dt, t
162 )));
163 }
164 if logical_dt != wire_dt {
165 return Err(DriverError::ValidationError(format!(
166 "non-numeric downlink does not support logical/wire datatype mapping: logical={:?}, wire={:?}",
167 logical_dt, wire_dt
168 )));
169 }
170 Ok(value.clone())
171 }
172
173 #[inline]
174 fn numeric_identity_logical_to_wire(
175 value: &NGValue,
176 logical_dt: DataType,
177 wire_dt: DataType,
178 ) -> DriverResult<NGValue> {
179 if logical_dt == wire_dt {
180 return Ok(value.clone());
181 }
182 let cast_err = |e: crate::NGValueCastError| {
183 DriverError::ValidationError(format!(
184 "downlink datatype cast failed: logical={logical_dt:?} -> wire={wire_dt:?}, error={e}",
185 ))
186 };
187 match wire_dt {
188 DataType::Int8 => Ok(NGValue::Int8(i8::try_from(value).map_err(cast_err)?)),
189 DataType::UInt8 => Ok(NGValue::UInt8(u8::try_from(value).map_err(cast_err)?)),
190 DataType::Int16 => Ok(NGValue::Int16(i16::try_from(value).map_err(cast_err)?)),
191 DataType::UInt16 => Ok(NGValue::UInt16(u16::try_from(value).map_err(cast_err)?)),
192 DataType::Int32 => Ok(NGValue::Int32(i32::try_from(value).map_err(cast_err)?)),
193 DataType::UInt32 => Ok(NGValue::UInt32(u32::try_from(value).map_err(cast_err)?)),
194 DataType::Int64 => Ok(NGValue::Int64(i64::try_from(value).map_err(cast_err)?)),
195 DataType::UInt64 => Ok(NGValue::UInt64(u64::try_from(value).map_err(cast_err)?)),
196 DataType::Float32 => Ok(NGValue::Float32(f32::try_from(value).map_err(cast_err)?)),
197 DataType::Float64 => Ok(NGValue::Float64(f64::try_from(value).map_err(cast_err)?)),
198 _ => Err(DriverError::ConfigurationError(format!(
199 "unsupported wire numeric data type: {wire_dt:?}",
200 ))),
201 }
202 }
203
204 #[inline]
205 fn numeric_value_to_f64_strict(value: &NGValue, logical_dt: DataType) -> DriverResult<f64> {
206 let y = match logical_dt {
207 DataType::UInt64 => {
208 let v = u64::try_from(value).map_err(|e| {
209 DriverError::ValidationError(format!(
210 "numeric downlink value conversion failed: logical={logical_dt:?}, actual={:?}, error={e}",
211 value.data_type()
212 ))
213 })?;
214 if v > Self::F64_EXACT_INT_MAX_U64 {
215 return Err(DriverError::ValidationError(format!(
216 "numeric downlink value too large for safe transform (UInt64 > 2^53): {v}"
217 )));
218 }
219 v as f64
220 }
221 DataType::Int64 => {
222 let v = i64::try_from(value).map_err(|e| {
223 DriverError::ValidationError(format!(
224 "numeric downlink value conversion failed: logical={logical_dt:?}, actual={:?}, error={e}",
225 value.data_type()
226 ))
227 })?;
228 if v.unsigned_abs() > Self::F64_EXACT_INT_MAX_U64 {
229 return Err(DriverError::ValidationError(format!(
230 "numeric downlink value too large for safe transform (Int64 magnitude > 2^53): {v}"
231 )));
232 }
233 v as f64
234 }
235 _ => f64::try_from(value).map_err(|e| {
236 DriverError::ValidationError(format!(
237 "numeric downlink value conversion failed: logical={:?}, actual={:?}, error={e}",
238 logical_dt,
239 value.data_type()
240 ))
241 })?,
242 };
243 if y.is_finite() {
244 Ok(y)
245 } else {
246 Err(DriverError::ValidationError(
247 "numeric downlink value must be finite".to_string(),
248 ))
249 }
250 }
251
252 #[inline]
253 fn inverse_transform_f64_strict(y: f64, t: &Transform) -> DriverResult<f64> {
254 if matches!(t.transform_scale, Some(s) if s == 0.0) {
256 return Err(DriverError::ConfigurationError(
257 "transform_scale must not be 0.0 for downlink inverse transform".to_string(),
258 ));
259 }
260 let x = t.inverse_f64(y);
261 if x.is_finite() {
262 Ok(x)
263 } else {
264 Err(DriverError::ValidationError(
265 "numeric downlink inverse transform produced non-finite value".to_string(),
266 ))
267 }
268 }
269
270 #[inline]
271 fn box_f64_to_wire_strict(x: f64, wire_dt: DataType) -> DriverResult<NGValue> {
272 let out = match wire_dt {
273 DataType::Boolean | DataType::String | DataType::Binary | DataType::Timestamp => {
274 return Err(DriverError::ConfigurationError(format!(
275 "wire datatype must be numeric here, got {wire_dt:?}"
276 )))
277 }
278 DataType::Int8 => {
279 let r = x.round();
280 (r >= i8::MIN as f64 && r <= i8::MAX as f64).then(|| NGValue::Int8(r as i8))
281 }
282 DataType::UInt8 => {
283 let r = x.round();
284 (r >= 0.0 && r <= u8::MAX as f64).then(|| NGValue::UInt8(r as u8))
285 }
286 DataType::Int16 => {
287 let r = x.round();
288 (r >= i16::MIN as f64 && r <= i16::MAX as f64).then(|| NGValue::Int16(r as i16))
289 }
290 DataType::UInt16 => {
291 let r = x.round();
292 (r >= 0.0 && r <= u16::MAX as f64).then(|| NGValue::UInt16(r as u16))
293 }
294 DataType::Int32 => {
295 let r = x.round();
296 (r >= i32::MIN as f64 && r <= i32::MAX as f64).then(|| NGValue::Int32(r as i32))
297 }
298 DataType::UInt32 => {
299 let r = x.round();
300 (r >= 0.0 && r <= u32::MAX as f64).then(|| NGValue::UInt32(r as u32))
301 }
302 DataType::Int64 => {
303 let r = x.round();
304 if (r < -(Self::F64_EXACT_INT_MAX_U64 as f64)
305 || r > (Self::F64_EXACT_INT_MAX_U64 as f64))
306 || (r < i64::MIN as f64 || r > i64::MAX as f64)
307 {
308 None
309 } else {
310 Some(NGValue::Int64(r as i64))
311 }
312 }
313 DataType::UInt64 => {
314 let r = x.round();
315 if r < 0.0 || r > (Self::F64_EXACT_INT_MAX_U64 as f64) {
316 None
317 } else {
318 Some(NGValue::UInt64(r as u64))
319 }
320 }
321 DataType::Float32 => Some(NGValue::Float32(x as f32)),
322 DataType::Float64 => Some(NGValue::Float64(x)),
323 };
324
325 out.ok_or(DriverError::ValidationError(format!(
326 "downlink value out of range after inverse: wire={:?}, value={x}",
327 wire_dt
328 )))
329 }
330
331 #[inline]
332 pub fn apply_transform_f64(x: f64, t: &Transform) -> f64 {
333 t.apply_f64(x)
334 }
335
336 #[inline]
337 fn should_apply_numeric_transform(expected: DataType, t: &Transform) -> bool {
338 expected.is_numeric() && !t.is_identity_numeric()
339 }
340
341 #[inline]
346 fn apply_transform_f64_if_needed(x: f64, expected: DataType, t: &Transform) -> f64 {
347 if Self::should_apply_numeric_transform(expected, t) {
348 Self::apply_transform_f64(x, t)
349 } else {
350 x
351 }
352 }
353
354 #[inline]
358 fn coerce_f64_to_value_after_transform(value: f64, expected: DataType) -> Option<NGValue> {
359 match expected {
360 DataType::Boolean => {
361 if !value.is_finite() {
362 None
363 } else {
364 Some(NGValue::Boolean(value != 0.0))
365 }
366 }
367 DataType::Int8 => {
368 let v = value.round();
369 if !v.is_finite() {
370 return None;
371 }
372 if v >= i8::MIN as f64 && v <= i8::MAX as f64 {
373 Some(NGValue::Int8(v as i8))
374 } else {
375 None
376 }
377 }
378 DataType::UInt8 => {
379 let v = value.round();
380 if !v.is_finite() {
381 return None;
382 }
383 if v >= 0.0 && v <= u8::MAX as f64 {
384 Some(NGValue::UInt8(v as u8))
385 } else {
386 None
387 }
388 }
389 DataType::Int16 => {
390 let v = value.round();
391 if !v.is_finite() {
392 return None;
393 }
394 if v >= i16::MIN as f64 && v <= i16::MAX as f64 {
395 Some(NGValue::Int16(v as i16))
396 } else {
397 None
398 }
399 }
400 DataType::UInt16 => {
401 let v = value.round();
402 if !v.is_finite() {
403 return None;
404 }
405 if v >= 0.0 && v <= u16::MAX as f64 {
406 Some(NGValue::UInt16(v as u16))
407 } else {
408 None
409 }
410 }
411 DataType::Int32 => {
412 let v = value.round();
413 if !v.is_finite() {
414 return None;
415 }
416 if v >= i32::MIN as f64 && v <= i32::MAX as f64 {
417 Some(NGValue::Int32(v as i32))
418 } else {
419 None
420 }
421 }
422 DataType::UInt32 => {
423 let v = value.round();
424 if !v.is_finite() {
425 return None;
426 }
427 if v >= 0.0 && v <= u32::MAX as f64 {
428 Some(NGValue::UInt32(v as u32))
429 } else {
430 None
431 }
432 }
433 DataType::Int64 => {
434 let v = value.round();
435 if !v.is_finite() {
436 return None;
437 }
438 if v < i64::MIN as f64 || v > i64::MAX as f64 {
439 return None;
440 }
441 Some(NGValue::Int64(v as i64))
442 }
443 DataType::UInt64 => {
444 let v = value.round();
445 if !v.is_finite() {
446 return None;
447 }
448 if v < 0.0 || v > u64::MAX as f64 {
449 return None;
450 }
451 Some(NGValue::UInt64(v as u64))
452 }
453 DataType::Float32 => Some(NGValue::Float32(value as f32)),
454 DataType::Float64 => Some(NGValue::Float64(value)),
455 DataType::String => Some(NGValue::String(Arc::<str>::from(value.to_string()))),
456 DataType::Binary => Some(NGValue::Binary(Bytes::copy_from_slice(
457 &value.to_be_bytes(),
458 ))),
459 DataType::Timestamp => {
460 let v = value.round();
463 if !v.is_finite() {
464 return None;
465 }
466 if v < i64::MIN as f64 || v > i64::MAX as f64 {
467 return None;
468 }
469 Some(NGValue::Timestamp(v as i64))
470 }
471 }
472 }
473
474 #[inline]
480 pub fn coerce_bool_to_value(value: bool, expected: DataType, t: &Transform) -> Option<NGValue> {
481 match expected {
482 DataType::Boolean => Some(NGValue::Boolean(value)),
483 DataType::String => Some(NGValue::String(Arc::<str>::from(if value {
484 "true"
485 } else {
486 "false"
487 }))),
488 DataType::Binary => Some(NGValue::Binary(Bytes::from_static(if value {
489 &[1u8; 1]
490 } else {
491 &[0u8; 1]
492 }))),
493 DataType::Timestamp => None,
494 _ => Self::coerce_f64_to_value(if value { 1.0 } else { 0.0 }, expected, t),
496 }
497 }
498
499 #[inline]
504 pub fn coerce_f64_to_value(value: f64, expected: DataType, t: &Transform) -> Option<NGValue> {
505 let value = Self::apply_transform_f64_if_needed(value, expected, t);
506 Self::coerce_f64_to_value_after_transform(value, expected)
507 }
508
509 #[inline]
517 pub fn coerce_u64_to_value(value: u64, expected: DataType, t: &Transform) -> Option<NGValue> {
518 if Self::should_apply_numeric_transform(expected, t) {
519 let v = Self::apply_transform_f64(value as f64, t);
520 return Self::coerce_f64_to_value_after_transform(v, expected);
521 }
522 match expected {
523 DataType::Boolean => Some(NGValue::Boolean(value != 0)),
524 DataType::UInt8 => u8::try_from(value).ok().map(NGValue::UInt8),
525 DataType::UInt16 => u16::try_from(value).ok().map(NGValue::UInt16),
526 DataType::UInt32 => u32::try_from(value).ok().map(NGValue::UInt32),
527 DataType::UInt64 => Some(NGValue::UInt64(value)),
528 DataType::Int8 => i8::try_from(value).ok().map(NGValue::Int8),
529 DataType::Int16 => i16::try_from(value).ok().map(NGValue::Int16),
530 DataType::Int32 => i32::try_from(value).ok().map(NGValue::Int32),
531 DataType::Int64 => i64::try_from(value).ok().map(NGValue::Int64),
532 DataType::Float32 => Some(NGValue::Float32(value as f32)),
533 DataType::Float64 => Some(NGValue::Float64(value as f64)),
534 DataType::String => Some(NGValue::String(Arc::<str>::from(value.to_string()))),
535 DataType::Binary => Some(NGValue::Binary(Bytes::copy_from_slice(
536 &value.to_be_bytes(),
537 ))),
538 DataType::Timestamp => {
539 if value > i64::MAX as u64 {
541 None
542 } else {
543 Some(NGValue::Timestamp(value as i64))
544 }
545 }
546 }
547 }
548
549 #[inline]
553 pub fn coerce_i64_to_value(value: i64, expected: DataType, t: &Transform) -> Option<NGValue> {
554 if Self::should_apply_numeric_transform(expected, t) {
555 let v = Self::apply_transform_f64(value as f64, t);
556 return Self::coerce_f64_to_value_after_transform(v, expected);
557 }
558 match expected {
559 DataType::Boolean => Some(NGValue::Boolean(value != 0)),
560 DataType::Int8 => i8::try_from(value).ok().map(NGValue::Int8),
561 DataType::Int16 => i16::try_from(value).ok().map(NGValue::Int16),
562 DataType::Int32 => i32::try_from(value).ok().map(NGValue::Int32),
563 DataType::Int64 => Some(NGValue::Int64(value)),
564 DataType::UInt8 => u8::try_from(value).ok().map(NGValue::UInt8),
565 DataType::UInt16 => u16::try_from(value).ok().map(NGValue::UInt16),
566 DataType::UInt32 => u32::try_from(value).ok().map(NGValue::UInt32),
567 DataType::UInt64 => u64::try_from(value).ok().map(NGValue::UInt64),
568 DataType::Float32 => Some(NGValue::Float32(value as f32)),
569 DataType::Float64 => Some(NGValue::Float64(value as f64)),
570 DataType::String => Some(NGValue::String(Arc::<str>::from(value.to_string()))),
571 DataType::Binary => Some(NGValue::Binary(Bytes::copy_from_slice(
572 &value.to_be_bytes(),
573 ))),
574 DataType::Timestamp => Some(NGValue::Timestamp(value)),
575 }
576 }
577
578 #[inline]
580 pub fn time_of_day_to_ms(t: NaiveTime) -> u64 {
581 (t.num_seconds_from_midnight() as u64) * 1000 + (t.nanosecond() / 1_000_000) as u64
582 }
583
584 #[inline]
585 pub fn duration_to_ms(d: Duration) -> i64 {
586 d.num_milliseconds()
587 }
588
589 #[inline]
590 pub fn date_to_epoch_ms(d: NaiveDate) -> Option<i64> {
591 let ndt = d.and_time(NaiveTime::from_hms_opt(0, 0, 0)?);
592 let dt = DateTime::<Utc>::from_naive_utc_and_offset(ndt, Utc);
593 Some(dt.timestamp_millis())
594 }
595
596 #[inline]
597 pub fn datetime_to_epoch_ms(ndt: NaiveDateTime) -> i64 {
598 let dt = DateTime::<Utc>::from_naive_utc_and_offset(ndt, Utc);
599 dt.timestamp_millis()
600 }
601
602 #[inline]
604 pub fn bytes_to_hex_string(bytes: &[u8]) -> String {
605 const LUT: &[u8; 16] = b"0123456789abcdef";
606 let mut out = String::with_capacity(2 + bytes.len() * 2);
607 out.push_str("0x");
608 for &b in bytes {
609 out.push(LUT[(b >> 4) as usize] as char);
610 out.push(LUT[(b & 0x0F) as usize] as char);
611 }
612 out
613 }
614
615 #[inline]
617 pub fn hex_string_to_bytes(s: &str) -> Option<Vec<u8>> {
618 let st = s.trim();
619 let hex = if st.starts_with("0x") || st.starts_with("0X") {
620 &st[2..]
621 } else {
622 st
623 };
624 if hex.is_empty() {
625 return Some(Vec::new());
626 }
627 let bytes = hex.as_bytes();
628 let mut out: Vec<u8> = Vec::with_capacity(bytes.len().div_ceil(2));
629 let mut i = 0usize;
630 while i + 1 < bytes.len() {
631 let hi = (bytes[i] as char).to_digit(16)? as u8;
632 let lo = (bytes[i + 1] as char).to_digit(16)? as u8;
633 out.push((hi << 4) | (lo & 0x0F));
634 i += 2;
635 }
636 if i < bytes.len() {
637 let hi = (bytes[i] as char).to_digit(16)? as u8;
638 out.push(hi << 4);
639 }
640 Some(out)
641 }
642
643 #[inline]
647 pub fn json_to_timestamp_ms(v: &serde_json::Value) -> Option<i64> {
648 if let Some(s) = v.as_str() {
649 if let Ok(dt) = DateTime::parse_from_rfc3339(s.trim()) {
650 return Some(dt.timestamp_millis());
651 }
652 }
653 if let Some(n) = v.as_i64() {
654 return Some(n);
655 }
656 if let Some(n) = v.as_u64() {
657 return Some(n as i64);
658 }
659 if let Some(n) = v.as_f64() {
660 if !n.is_finite() {
661 return None;
662 }
663 let r = n.round();
664 if r < i64::MIN as f64 || r > i64::MAX as f64 {
665 return None;
666 }
667 return Some(r as i64);
668 }
669 None
670 }
671}