Skip to main content

fluss/row/
decimal.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::error::{Error, Result};
19use bigdecimal::num_bigint::BigInt;
20use bigdecimal::num_traits::Zero;
21use bigdecimal::{BigDecimal, RoundingMode};
22use std::fmt;
23
24#[cfg(test)]
25use std::str::FromStr;
26
27/// Maximum decimal precision that can be stored compactly as a single i64.
28/// Values with precision > MAX_COMPACT_PRECISION require byte array storage.
29pub const MAX_COMPACT_PRECISION: u32 = 18;
30
31/// An internal data structure representing a decimal value with fixed precision and scale.
32///
33/// This data structure is immutable and stores decimal values in a compact representation
34/// (as a long value) if values are small enough (precision ≤ 18).
35///
36/// Matches Java's org.apache.fluss.row.Decimal class.
37#[derive(Debug, Clone, serde::Serialize)]
38pub struct Decimal {
39    precision: u32,
40    scale: u32,
41    // If precision <= MAX_COMPACT_PRECISION, this holds the unscaled value
42    long_val: Option<i64>,
43    // BigDecimal representation (may be cached)
44    decimal_val: Option<BigDecimal>,
45}
46
47impl Decimal {
48    /// Returns the precision of this Decimal.
49    ///
50    /// The precision is the number of digits in the unscaled value.
51    pub fn precision(&self) -> u32 {
52        self.precision
53    }
54
55    /// Returns the scale of this Decimal.
56    pub fn scale(&self) -> u32 {
57        self.scale
58    }
59
60    /// Returns whether the decimal value is small enough to be stored in a long.
61    pub fn is_compact(&self) -> bool {
62        self.precision <= MAX_COMPACT_PRECISION
63    }
64
65    /// Returns whether a given precision can be stored compactly.
66    pub fn is_compact_precision(precision: u32) -> bool {
67        precision <= MAX_COMPACT_PRECISION
68    }
69
70    /// Converts this Decimal into a BigDecimal.
71    pub fn to_big_decimal(&self) -> BigDecimal {
72        if let Some(bd) = &self.decimal_val {
73            bd.clone()
74        } else if let Some(long_val) = self.long_val {
75            BigDecimal::new(BigInt::from(long_val), self.scale as i64)
76        } else {
77            // Should never happen - we always have one representation
78            BigDecimal::new(BigInt::from(0), self.scale as i64)
79        }
80    }
81
82    /// Returns a long describing the unscaled value of this Decimal.
83    pub fn to_unscaled_long(&self) -> Result<i64> {
84        if let Some(long_val) = self.long_val {
85            Ok(long_val)
86        } else {
87            // Extract unscaled value from BigDecimal
88            let bd = self.to_big_decimal();
89            let (unscaled, _) = bd.as_bigint_and_exponent();
90            unscaled.try_into().map_err(|_| Error::IllegalArgument {
91                message: format!(
92                    "Decimal unscaled value does not fit in i64: precision={}",
93                    self.precision
94                ),
95            })
96        }
97    }
98
99    /// Returns a byte array describing the unscaled value of this Decimal.
100    pub fn to_unscaled_bytes(&self) -> Vec<u8> {
101        let bd = self.to_big_decimal();
102        let (unscaled, _) = bd.as_bigint_and_exponent();
103        unscaled.to_signed_bytes_be()
104    }
105
106    /// Creates a Decimal from Arrow's Decimal128 representation.
107    // TODO: For compact decimals with matching scale we may call from_unscaled_long
108    pub fn from_arrow_decimal128(
109        i128_val: i128,
110        arrow_scale: i64,
111        precision: u32,
112        scale: u32,
113    ) -> Result<Self> {
114        let bd = BigDecimal::new(BigInt::from(i128_val), arrow_scale);
115        Self::from_big_decimal(bd, precision, scale)
116    }
117
118    /// Creates an instance of Decimal from a BigDecimal with the given precision and scale.
119    ///
120    /// The returned decimal value may be rounded to have the desired scale. The precision
121    /// will be checked. If the precision overflows, an error is returned.
122    pub fn from_big_decimal(bd: BigDecimal, precision: u32, scale: u32) -> Result<Self> {
123        // Rescale to the target scale with HALF_UP rounding (matches Java)
124        let scaled = bd.with_scale_round(scale as i64, RoundingMode::HalfUp);
125
126        // Extract unscaled value
127        let (unscaled, exp) = scaled.as_bigint_and_exponent();
128
129        // Sanity check that scale matches
130        debug_assert_eq!(
131            exp, scale as i64,
132            "Scaled decimal exponent ({exp}) != expected scale ({scale})"
133        );
134
135        let actual_precision = Self::compute_precision(&unscaled);
136        if actual_precision > precision as usize {
137            return Err(Error::IllegalArgument {
138                message: format!(
139                    "Decimal precision overflow: value has {actual_precision} digits but precision is {precision} (value: {scaled})"
140                ),
141            });
142        }
143
144        // Compute compact representation if possible
145        let long_val = if precision <= MAX_COMPACT_PRECISION {
146            Some(i64::try_from(&unscaled).map_err(|_| Error::IllegalArgument {
147                message: format!(
148                    "Decimal mantissa exceeds i64 range for compact precision {precision}: unscaled={unscaled} (value={scaled})"
149                ),
150            })?)
151        } else {
152            None
153        };
154
155        Ok(Decimal {
156            precision,
157            scale,
158            long_val,
159            decimal_val: Some(scaled),
160        })
161    }
162
163    /// Creates an instance of Decimal from an unscaled long value with the given precision and scale.
164    pub fn from_unscaled_long(unscaled_long: i64, precision: u32, scale: u32) -> Result<Self> {
165        if precision > MAX_COMPACT_PRECISION {
166            return Err(Error::IllegalArgument {
167                message: format!(
168                    "Precision {precision} exceeds MAX_COMPACT_PRECISION ({MAX_COMPACT_PRECISION})"
169                ),
170            });
171        }
172
173        let actual_precision = Self::compute_precision(&BigInt::from(unscaled_long));
174        if actual_precision > precision as usize {
175            return Err(Error::IllegalArgument {
176                message: format!(
177                    "Decimal precision overflow: unscaled value has {actual_precision} digits but precision is {precision}"
178                ),
179            });
180        }
181
182        Ok(Decimal {
183            precision,
184            scale,
185            long_val: Some(unscaled_long),
186            decimal_val: None,
187        })
188    }
189
190    /// Creates an instance of Decimal from an unscaled byte array with the given precision and scale.
191    pub fn from_unscaled_bytes(unscaled_bytes: &[u8], precision: u32, scale: u32) -> Result<Self> {
192        let unscaled = BigInt::from_signed_bytes_be(unscaled_bytes);
193        let bd = BigDecimal::new(unscaled, scale as i64);
194        Self::from_big_decimal(bd, precision, scale)
195    }
196
197    /// Computes the precision of a decimal's unscaled value, matching Java's BigDecimal.precision().
198    pub fn compute_precision(unscaled: &BigInt) -> usize {
199        if unscaled.is_zero() {
200            return 1;
201        }
202
203        // Count ALL digits in the unscaled value (matches Java's BigDecimal.precision())
204        // For bounded precision (≤ 38 digits), string conversion is cheap and simple.
205        unscaled.magnitude().to_str_radix(10).len()
206    }
207}
208
209impl fmt::Display for Decimal {
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        write!(f, "{}", self.to_big_decimal())
212    }
213}
214
215// Manual implementations of comparison traits to ignore cached fields
216impl PartialEq for Decimal {
217    fn eq(&self, other: &Self) -> bool {
218        // Use numeric equality like Java's Decimal.equals() which delegates to compareTo.
219        // This means 1.0 (scale=1) equals 1.00 (scale=2).
220        self.cmp(other) == std::cmp::Ordering::Equal
221    }
222}
223
224impl Eq for Decimal {}
225
226impl PartialOrd for Decimal {
227    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
228        Some(self.cmp(other))
229    }
230}
231
232impl Ord for Decimal {
233    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
234        // If both are compact and have the same scale, compare directly
235        if self.is_compact() && other.is_compact() && self.scale == other.scale {
236            self.long_val.cmp(&other.long_val)
237        } else {
238            // Otherwise, compare as BigDecimal
239            self.to_big_decimal().cmp(&other.to_big_decimal())
240        }
241    }
242}
243
244impl std::hash::Hash for Decimal {
245    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
246        // Hash the BigDecimal representation.
247        //
248        // IMPORTANT: Unlike Java's BigDecimal, Rust's bigdecimal crate normalizes
249        // before hashing, so hash(1.0) == hash(1.00). Combined with our numeric
250        // equality (1.0 == 1.00), this CORRECTLY satisfies the hash/equals contract.
251        //
252        // This is BETTER than Java's implementation which has a hash/equals violation:
253        // - Java: equals(1.0, 1.00) = true, but hashCode(1.0) != hashCode(1.00)
254        // - Rust: equals(1.0, 1.00) = true, and hash(1.0) == hash(1.00) ✓
255        //
256        // Result: HashMap/HashSet will work correctly even if you create Decimals
257        // with different scales for the same numeric value (though this is rare in
258        // practice since decimals are schema-driven with fixed precision/scale).
259        self.to_big_decimal().hash(state);
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn test_precision_calculation() {
269        // Zero is special case
270        assert_eq!(Decimal::compute_precision(&BigInt::from(0)), 1);
271
272        // Must count ALL digits including trailing zeros (matches Java BigDecimal.precision())
273        assert_eq!(Decimal::compute_precision(&BigInt::from(10)), 2);
274        assert_eq!(Decimal::compute_precision(&BigInt::from(100)), 3);
275        assert_eq!(Decimal::compute_precision(&BigInt::from(12300)), 5);
276        assert_eq!(
277            Decimal::compute_precision(&BigInt::from(10000000000i64)),
278            11
279        );
280
281        // Test the case: value=1, scale=10 → unscaled=10000000000 (11 digits)
282        let bd = BigDecimal::new(BigInt::from(1), 0);
283        assert!(
284            Decimal::from_big_decimal(bd.clone(), 1, 10).is_err(),
285            "Should reject: unscaled 10000000000 has 11 digits, precision=1 is too small"
286        );
287        assert!(
288            Decimal::from_big_decimal(bd, 11, 10).is_ok(),
289            "Should accept with correct precision=11"
290        );
291    }
292
293    /// Test precision validation boundaries
294    #[test]
295    fn test_precision_validation() {
296        let test_cases = vec![
297            (10i64, 1, 2),            // 1.0 → unscaled: 10 (2 digits)
298            (100i64, 2, 3),           // 1.00 → unscaled: 100 (3 digits)
299            (10000000000i64, 10, 11), // 1.0000000000 → unscaled: 10000000000 (11 digits)
300        ];
301
302        for (unscaled, scale, min_precision) in test_cases {
303            let bd = BigDecimal::new(BigInt::from(unscaled), scale as i64);
304
305            // Reject if precision too small
306            assert!(Decimal::from_big_decimal(bd.clone(), min_precision - 1, scale).is_err());
307            // Accept with correct precision
308            assert!(Decimal::from_big_decimal(bd, min_precision, scale).is_ok());
309        }
310
311        // i64::MAX has 19 digits, should reject with precision=5
312        let bd = BigDecimal::new(BigInt::from(i64::MAX), 0);
313        assert!(Decimal::from_big_decimal(bd, 5, 0).is_err());
314    }
315
316    /// Test creation and basic operations for both compact and non-compact decimals
317    #[test]
318    fn test_creation_and_representation() {
319        // Compact (precision ≤ 18): from unscaled long
320        let compact = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
321        assert_eq!(compact.precision(), 10);
322        assert_eq!(compact.scale(), 2);
323        assert!(compact.is_compact());
324        assert_eq!(compact.to_unscaled_long().unwrap(), 12345);
325        assert_eq!(compact.to_big_decimal().to_string(), "123.45");
326
327        // Non-compact (precision > 18): from BigDecimal
328        let bd = BigDecimal::new(BigInt::from(12345), 0);
329        let non_compact = Decimal::from_big_decimal(bd, 28, 0).unwrap();
330        assert_eq!(non_compact.precision(), 28);
331        assert!(!non_compact.is_compact());
332        assert_eq!(
333            non_compact.to_unscaled_bytes(),
334            BigInt::from(12345).to_signed_bytes_be()
335        );
336
337        // Test compact boundary
338        assert!(Decimal::is_compact_precision(18));
339        assert!(!Decimal::is_compact_precision(19));
340
341        // Test rounding during creation
342        let bd = BigDecimal::new(BigInt::from(12345), 3); // 12.345
343        let rounded = Decimal::from_big_decimal(bd, 10, 2).unwrap();
344        assert_eq!(rounded.to_unscaled_long().unwrap(), 1235); // 12.35
345    }
346
347    /// Test serialization round-trip (unscaled bytes)
348    #[test]
349    fn test_serialization_roundtrip() {
350        // Compact decimal
351        let bd1 = BigDecimal::new(BigInt::from(1314567890123i64), 5); // 13145678.90123
352        let decimal1 = Decimal::from_big_decimal(bd1.clone(), 15, 5).unwrap();
353        let (unscaled1, _) = bd1.as_bigint_and_exponent();
354        let from_bytes1 =
355            Decimal::from_unscaled_bytes(&unscaled1.to_signed_bytes_be(), 15, 5).unwrap();
356        assert_eq!(from_bytes1, decimal1);
357        assert_eq!(
358            from_bytes1.to_unscaled_bytes(),
359            unscaled1.to_signed_bytes_be()
360        );
361
362        // Non-compact decimal
363        let bd2 = BigDecimal::new(BigInt::from(12345678900987654321i128), 10);
364        let decimal2 = Decimal::from_big_decimal(bd2.clone(), 23, 10).unwrap();
365        let (unscaled2, _) = bd2.as_bigint_and_exponent();
366        let from_bytes2 =
367            Decimal::from_unscaled_bytes(&unscaled2.to_signed_bytes_be(), 23, 10).unwrap();
368        assert_eq!(from_bytes2, decimal2);
369        assert_eq!(
370            from_bytes2.to_unscaled_bytes(),
371            unscaled2.to_signed_bytes_be()
372        );
373    }
374
375    /// Test numeric equality and ordering (matches Java semantics)
376    #[test]
377    fn test_equality_and_ordering() {
378        // Same value, different precision/scale → should be equal (numeric equality)
379        let d1 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(10), 1), 2, 1).unwrap(); // 1.0
380        let d2 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(100), 2), 3, 2).unwrap(); // 1.00
381        assert_eq!(d1, d2, "Numeric equality: 1.0 == 1.00");
382        assert_eq!(d1.cmp(&d2), std::cmp::Ordering::Equal);
383
384        // Test ordering with positive values
385        let small = Decimal::from_unscaled_long(10, 5, 0).unwrap();
386        let large = Decimal::from_unscaled_long(15, 5, 0).unwrap();
387        assert!(small < large);
388        assert_eq!(small.cmp(&large), std::cmp::Ordering::Less);
389
390        // Test ordering with negative values
391        let negative_large = Decimal::from_unscaled_long(-10, 5, 0).unwrap(); // -10
392        let negative_small = Decimal::from_unscaled_long(-15, 5, 0).unwrap(); // -15
393        assert!(negative_small < negative_large); // -15 < -10
394        assert_eq!(
395            negative_small.cmp(&negative_large),
396            std::cmp::Ordering::Less
397        );
398
399        // Test ordering with mixed positive and negative
400        let positive = Decimal::from_unscaled_long(5, 5, 0).unwrap();
401        let negative = Decimal::from_unscaled_long(-5, 5, 0).unwrap();
402        assert!(negative < positive);
403        assert_eq!(negative.cmp(&positive), std::cmp::Ordering::Less);
404
405        // Test clone and round-trip equality
406        let original = Decimal::from_unscaled_long(10, 5, 0).unwrap();
407        assert_eq!(original.clone(), original);
408        assert_eq!(
409            Decimal::from_unscaled_long(original.to_unscaled_long().unwrap(), 5, 0).unwrap(),
410            original
411        );
412    }
413
414    /// Test hash/equals contract (Rust implementation is correct, unlike Java)
415    #[test]
416    fn test_hash_equals_contract() {
417        use std::collections::hash_map::DefaultHasher;
418        use std::hash::{Hash, Hasher};
419
420        let d1 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(10), 1), 2, 1).unwrap(); // 1.0
421        let d2 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(100), 2), 3, 2).unwrap(); // 1.00
422
423        // Numeric equality
424        assert_eq!(d1, d2);
425
426        // Hash contract: if a == b, then hash(a) == hash(b)
427        let mut hasher1 = DefaultHasher::new();
428        d1.hash(&mut hasher1);
429        let hash1 = hasher1.finish();
430
431        let mut hasher2 = DefaultHasher::new();
432        d2.hash(&mut hasher2);
433        let hash2 = hasher2.finish();
434
435        assert_eq!(hash1, hash2, "Equal decimals must have equal hashes");
436
437        // Verify HashMap works correctly (this would fail in Java due to their hash/equals bug)
438        let mut map = std::collections::HashMap::new();
439        map.insert(d1.clone(), "value");
440        assert_eq!(map.get(&d2), Some(&"value"));
441    }
442
443    /// Test edge cases: zeros, large numbers, rescaling
444    #[test]
445    fn test_edge_cases() {
446        // Zero handling (compact and non-compact)
447        let zero_compact = Decimal::from_unscaled_long(0, 5, 2).unwrap();
448        assert_eq!(
449            zero_compact.to_big_decimal(),
450            BigDecimal::new(BigInt::from(0), 2)
451        );
452
453        let zero_non_compact =
454            Decimal::from_big_decimal(BigDecimal::new(BigInt::from(0), 2), 20, 2).unwrap();
455        assert_eq!(
456            zero_non_compact.to_big_decimal(),
457            BigDecimal::new(BigInt::from(0), 2)
458        );
459
460        // Large number (39 digits)
461        let large_bd = BigDecimal::from_str("123456789012345678901234567890123456789").unwrap();
462        let large = Decimal::from_big_decimal(large_bd, 39, 0).unwrap();
463        let double_val = large.to_big_decimal().to_string().parse::<f64>().unwrap();
464        assert!((double_val - 1.2345678901234568E38).abs() < 0.01);
465
466        // Rescaling: 5.0 (scale=1) → 5.00 (scale=2)
467        let d1 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(50), 1), 10, 1).unwrap();
468        let d2 = Decimal::from_big_decimal(d1.to_big_decimal(), 10, 2).unwrap();
469        assert_eq!(d2.to_big_decimal().to_string(), "5.00");
470        assert_eq!(d2.scale(), 2);
471    }
472}