cherry_cast/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
use std::sync::Arc;

use anyhow::{Context, Result};
use arrow::{
    array::{builder, Array, BinaryArray, Decimal256Array, RecordBatch, StringArray},
    compute::CastOptions,
    datatypes::{DataType, Field, Schema},
};
use ruint::aliases::U256;

/// Casts columns according to given (column name, target data type) pairs.
///
/// Returns error if casting a row fails and `allow_cast_fail` is set to `false`.
/// Writes `null` to output if casting a row fails and `allow_cast_fail` is set to `true`.
pub fn cast<S: AsRef<str>>(
    map: &[(S, DataType)],
    data: &RecordBatch,
    allow_cast_fail: bool,
) -> Result<RecordBatch> {
    let schema = cast_schema(map, data.schema_ref()).context("cast schema")?;

    let mut arrays = Vec::with_capacity(data.num_columns());

    let cast_opt = CastOptions {
        safe: !allow_cast_fail,
        ..Default::default()
    };

    for (col, field) in data.columns().iter().zip(data.schema_ref().fields().iter()) {
        let cast_target = map.iter().find(|x| x.0.as_ref() == field.name());

        let col = match cast_target {
            Some(tgt) => Arc::new(
                arrow::compute::cast_with_options(col, &tgt.1, &cast_opt)
                    .with_context(|| format!("Failed when casting column '{}'", field.name()))?,
            ),
            None => col.clone(),
        };

        arrays.push(col);
    }

    let batch = RecordBatch::try_new(Arc::new(schema), arrays).context("construct record batch")?;

    Ok(batch)
}

/// Casts column types according to given (column name, target data type) pairs.
pub fn cast_schema<S: AsRef<str>>(map: &[(S, DataType)], schema: &Schema) -> Result<Schema> {
    let mut fields = schema.fields().to_vec();

    for f in fields.iter_mut() {
        let cast_target = map.iter().find(|x| x.0.as_ref() == f.name());

        if let Some(tgt) = cast_target {
            *f = Arc::new(Field::new(f.name(), tgt.1.clone(), f.is_nullable()));
        }
    }

    Ok(Schema::new(fields))
}

pub fn encode_hex_impl<const PREFIXED: bool>(data: &RecordBatch) -> Result<RecordBatch> {
    let schema = schema_binary_to_string(data.schema_ref());
    let mut columns = Vec::<Arc<dyn Array>>::with_capacity(data.columns().len());

    for col in data.columns().iter() {
        if col.data_type() == &DataType::Binary {
            columns.push(Arc::new(hex_encode_column::<PREFIXED>(
                col.as_any().downcast_ref::<BinaryArray>().unwrap(),
            )));
        } else {
            columns.push(col.clone());
        }
    }

    RecordBatch::try_new(Arc::new(schema), columns).context("construct arrow batch")
}

pub fn hex_encode_column<const PREFIXED: bool>(col: &BinaryArray) -> StringArray {
    let mut arr =
        builder::StringBuilder::with_capacity(col.len(), (col.value_data().len() + 2) * 2);

    for v in col.iter() {
        match v {
            Some(v) => {
                // TODO: avoid allocation here and use a scratch buffer to encode hex into or write to arrow buffer
                // directly somehow.
                let v = if PREFIXED {
                    format!("0x{}", faster_hex::hex_string(v))
                } else {
                    faster_hex::hex_string(v)
                };

                arr.append_value(v);
            }
            None => arr.append_null(),
        }
    }

    arr.finish()
}

/// Converts binary fields to string in the schema
///
/// Intended to be used with encode hex functions
pub fn schema_binary_to_string(schema: &Schema) -> Schema {
    let mut fields = Vec::<Arc<Field>>::with_capacity(schema.fields().len());

    for f in schema.fields().iter() {
        if f.data_type() == &DataType::Binary {
            fields.push(Arc::new(Field::new(
                f.name().clone(),
                DataType::Utf8,
                f.is_nullable(),
            )));
        } else {
            fields.push(f.clone());
        }
    }

    Schema::new(fields)
}

/// Converts decimal256 fields to binary in the schema
///
/// Intended to be used with u256_to_binary function
pub fn schema_decimal256_to_binary(schema: &Schema) -> Schema {
    let mut fields = Vec::<Arc<Field>>::with_capacity(schema.fields().len());

    for f in schema.fields().iter() {
        if f.data_type() == &DataType::Decimal256(76, 0) {
            fields.push(Arc::new(Field::new(
                f.name().clone(),
                DataType::Binary,
                f.is_nullable(),
            )));
        } else {
            fields.push(f.clone());
        }
    }

    Schema::new(fields)
}

pub fn hex_decode_column<const PREFIXED: bool>(col: &StringArray) -> Result<BinaryArray> {
    let mut arr = builder::BinaryBuilder::with_capacity(col.len(), col.value_data().len() / 2);

    for v in col.iter() {
        match v {
            // TODO: this should be optimized by removing allocations if needed
            Some(v) => {
                let v = v.as_bytes();
                let v = if PREFIXED {
                    v.get(2..).context("index into prefix hex encoded value")?
                } else {
                    v
                };

                let len = v.len();
                let mut dst = vec![0; (len + 1) / 2];

                faster_hex::hex_decode(v, &mut dst).context("hex decode")?;

                arr.append_value(dst);
            }
            None => arr.append_null(),
        }
    }

    Ok(arr.finish())
}

pub fn u256_column_from_binary(col: &BinaryArray) -> Result<Decimal256Array> {
    let mut arr = builder::Decimal256Builder::with_capacity(col.len());

    for v in col.iter() {
        match v {
            Some(v) => {
                let num = U256::try_from_be_slice(v).context("parse u256")?;
                let num = arrow::datatypes::i256::from_be_bytes(num.to_be_bytes::<32>());
                arr.append_value(num);
            }
            None => arr.append_null(),
        }
    }

    Ok(arr.finish())
}

pub fn u256_column_to_binary(col: &Decimal256Array) -> BinaryArray {
    let mut arr = builder::BinaryBuilder::with_capacity(col.len(), col.len() * 32);

    for v in col.iter() {
        match v {
            Some(v) => {
                let num = U256::from_be_bytes::<32>(v.to_be_bytes());
                arr.append_value(num.to_be_bytes_trimmed_vec());
            }
            None => {
                arr.append_null();
            }
        }
    }

    arr.finish()
}

/// Converts all Decimal256 (U256) columns in the batch to big endian binary values
pub fn u256_to_binary(data: &RecordBatch) -> Result<RecordBatch> {
    let schema = schema_binary_to_string(data.schema_ref());
    let mut columns = Vec::<Arc<dyn Array>>::with_capacity(data.columns().len());

    for col in data.columns().iter() {
        if col.data_type() == &DataType::Decimal256(76, 0) {
            let mut arr = builder::BinaryBuilder::new();

            let col = col.as_any().downcast_ref::<Decimal256Array>().unwrap();

            for val in col.iter() {
                arr.append_option(val.map(|v| v.to_be_bytes()));
            }

            columns.push(Arc::new(arr.finish()));
        } else {
            columns.push(col.clone());
        }
    }

    RecordBatch::try_new(Arc::new(schema), columns).context("construct arrow batch")
}