Skip to main content

polars_python/series/
import.rs

1use arrow::array::{Array, PrimitiveArray};
2use arrow::ffi;
3use arrow::ffi::{ArrowArray, ArrowArrayStream, ArrowArrayStreamReader, ArrowSchema};
4use polars::prelude::*;
5use polars_ffi::version_0::SeriesExport;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::pybacked::PyBackedBytes;
9use pyo3::types::{PyCapsule, PyTuple, PyType};
10
11use super::PySeries;
12use crate::error::PyPolarsErr;
13
14/// Validate PyCapsule has provided name
15fn validate_pycapsule_name(capsule: &Bound<PyCapsule>, expected_name: &str) -> PyResult<()> {
16    let capsule_name = capsule.name()?;
17    if let Some(capsule_name) = capsule_name {
18        let capsule_name = unsafe { capsule_name.as_cstr() };
19        if capsule_name.to_str() != Ok(expected_name) {
20            return Err(PyValueError::new_err(format!(
21                "Expected name '{expected_name}' in PyCapsule, instead got '{capsule_name:?}'"
22            )));
23        }
24    } else {
25        return Err(PyValueError::new_err(
26            "Expected schema PyCapsule to have name set.",
27        ));
28    }
29
30    Ok(())
31}
32
33/// Import `__arrow_c_array__` across Python boundary
34pub(crate) fn call_arrow_c_array<'py>(
35    ob: &Bound<'py, PyAny>,
36) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
37    if !ob.hasattr("__arrow_c_array__")? {
38        return Err(PyValueError::new_err(
39            "Expected an object with dunder __arrow_c_array__",
40        ));
41    }
42
43    let tuple = ob.getattr("__arrow_c_array__")?.call0()?;
44    if !tuple.is_instance_of::<PyTuple>() {
45        return Err(PyTypeError::new_err(
46            "Expected __arrow_c_array__ to return a tuple.",
47        ));
48    }
49
50    let schema_capsule = tuple.get_item(0)?.cast_into()?;
51    let array_capsule = tuple.get_item(1)?.cast_into()?;
52    Ok((schema_capsule, array_capsule))
53}
54
55pub(crate) fn import_array_pycapsules(
56    schema_capsule: &Bound<PyCapsule>,
57    array_capsule: &Bound<PyCapsule>,
58) -> PyResult<(arrow::datatypes::Field, Box<dyn Array>)> {
59    let field = import_schema_pycapsule(schema_capsule)?;
60
61    validate_pycapsule_name(array_capsule, "arrow_array")?;
62
63    // # Safety
64    // array_capsule holds a valid C ArrowArray pointer, as defined by the Arrow PyCapsule
65    // Interface
66    unsafe {
67        #[allow(deprecated)]
68        let array_ptr = std::ptr::replace(array_capsule.pointer() as _, ArrowArray::empty());
69        let array = ffi::import_array_from_c(array_ptr, field.dtype().clone()).unwrap();
70
71        Ok((field, array))
72    }
73}
74
75pub(crate) fn import_schema_pycapsule(
76    schema_capsule: &Bound<PyCapsule>,
77) -> PyResult<arrow::datatypes::Field> {
78    validate_pycapsule_name(schema_capsule, "arrow_schema")?;
79
80    // # Safety
81    // schema_capsule holds a valid C ArrowSchema pointer, as defined by the Arrow PyCapsule
82    // Interface
83    unsafe {
84        #[allow(deprecated)]
85        let schema_ptr = schema_capsule.reference::<ArrowSchema>();
86        let field = ffi::import_field_from_c(schema_ptr).unwrap();
87
88        Ok(field)
89    }
90}
91
92/// Import `__arrow_c_stream__` across Python boundary.
93fn call_arrow_c_stream<'py>(ob: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyCapsule>> {
94    if !ob.hasattr("__arrow_c_stream__")? {
95        return Err(PyValueError::new_err(
96            "Expected an object with dunder __arrow_c_stream__",
97        ));
98    }
99
100    let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.cast_into()?;
101    Ok(capsule)
102}
103
104pub(crate) fn import_stream_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<PySeries> {
105    validate_pycapsule_name(capsule, "arrow_array_stream")?;
106
107    // # Safety
108    // capsule holds a valid C ArrowArrayStream pointer, as defined by the Arrow PyCapsule
109    // Interface
110    let mut stream = unsafe {
111        // Takes ownership of the pointed to ArrowArrayStream
112        // This acts to move the data out of the capsule pointer, setting the release callback to NULL
113        #[allow(deprecated)]
114        let stream_ptr = Box::new(std::ptr::replace(
115            capsule.pointer() as _,
116            ArrowArrayStream::empty(),
117        ));
118        ArrowArrayStreamReader::try_new(stream_ptr)
119            .map_err(|err| PyValueError::new_err(err.to_string()))?
120    };
121
122    let mut produced_arrays: Vec<Box<dyn Array>> = vec![];
123    while let Some(array) = unsafe { stream.next() } {
124        produced_arrays.push(array.map_err(PyPolarsErr::from)?);
125    }
126
127    // Series::try_from fails for an empty vec of chunks
128    let s = if produced_arrays.is_empty() {
129        let polars_dt = DataType::from_arrow_field(stream.field());
130        Series::new_empty(stream.field().name.clone(), &polars_dt)
131    } else {
132        Series::try_from((stream.field(), produced_arrays)).map_err(PyPolarsErr::from)?
133    };
134    Ok(PySeries::new(s))
135}
136#[pymethods]
137impl PySeries {
138    #[classmethod]
139    pub fn from_arrow_c_array(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
140        let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
141        let (field, array) = import_array_pycapsules(&schema_capsule, &array_capsule)?;
142        let s = Series::try_from((&field, array)).unwrap();
143        Ok(PySeries::new(s))
144    }
145
146    #[classmethod]
147    pub fn from_arrow_c_stream(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
148        let capsule = call_arrow_c_stream(ob)?;
149        import_stream_pycapsule(&capsule)
150    }
151
152    #[classmethod]
153    /// Import a series via polars-ffi
154    /// Takes ownership of the [`SeriesExport`] at [`location`]
155    /// # Safety
156    /// [`location`] should be the address of an allocated and initialized [`SeriesExport`]
157    pub unsafe fn _import(_cls: &Bound<PyType>, location: usize) -> PyResult<Self> {
158        let location = location as *mut SeriesExport;
159
160        // # Safety
161        // `location` should be valid for reading
162        let series = unsafe {
163            let export = location.read();
164            polars_ffi::version_0::import_series(export).map_err(PyPolarsErr::from)?
165        };
166        Ok(PySeries::from(series))
167    }
168
169    #[staticmethod]
170    pub fn _import_decimal_from_iceberg_binary_repr(
171        bytes_list: &Bound<PyAny>, // list[bytes | None]
172        precision: usize,
173        scale: usize,
174    ) -> PyResult<Self> {
175        // From iceberg spec:
176        // * Decimal(P, S): Stores unscaled value as two’s-complement
177        //   big-endian binary, using the minimum number of bytes for the
178        //   value.
179        let max_abs_decimal_value = 10_i128.pow(u32::try_from(precision).unwrap()) - 1;
180
181        let out: Vec<i128> = bytes_list
182            .try_iter()?
183            .map(|bytes| {
184                let be_bytes: Option<PyBackedBytes> = bytes?.extract()?;
185
186                let mut le_bytes: [u8; 16] = [0; _];
187
188                if let Some(be_bytes) = be_bytes.as_deref() {
189                    if be_bytes.len() > le_bytes.len() {
190                        return Err(PyValueError::new_err(format!(
191                            "iceberg binary data for decimal exceeded 16 bytes: {}",
192                            be_bytes.len()
193                        )));
194                    }
195
196                    for (i, byte) in be_bytes.iter().rev().enumerate() {
197                        le_bytes[i] = *byte;
198                    }
199                }
200
201                let value = i128::from_le_bytes(le_bytes);
202
203                if value.abs() > max_abs_decimal_value {
204                    return Err(PyValueError::new_err(format!(
205                        "iceberg decoded value for decimal exceeded precision: \
206                        value: {value}, precision: {precision}",
207                    )));
208                }
209
210                Ok(value)
211            })
212            .collect::<PyResult<_>>()?;
213
214        Ok(PySeries::from(unsafe {
215            Series::from_chunks_and_dtype_unchecked(
216                PlSmallStr::EMPTY,
217                vec![PrimitiveArray::<i128>::from_vec(out).boxed()],
218                &DataType::Decimal(precision, scale),
219            )
220        }))
221    }
222}