polars_python/series/
import.rs

1use arrow::array::Array;
2use arrow::ffi;
3use arrow::ffi::{ArrowArray, ArrowArrayStream, ArrowArrayStreamReader, ArrowSchema};
4use polars::prelude::*;
5use pyo3::exceptions::{PyTypeError, PyValueError};
6use pyo3::prelude::*;
7use pyo3::types::{PyCapsule, PyTuple, PyType};
8
9use super::PySeries;
10
11/// Validate PyCapsule has provided name
12fn validate_pycapsule_name(capsule: &Bound<PyCapsule>, expected_name: &str) -> PyResult<()> {
13    let capsule_name = capsule.name()?;
14    if let Some(capsule_name) = capsule_name {
15        let capsule_name = capsule_name.to_str()?;
16        if capsule_name != expected_name {
17            return Err(PyValueError::new_err(format!(
18                "Expected name '{}' in PyCapsule, instead got '{}'",
19                expected_name, capsule_name
20            )));
21        }
22    } else {
23        return Err(PyValueError::new_err(
24            "Expected schema PyCapsule to have name set.",
25        ));
26    }
27
28    Ok(())
29}
30
31/// Import `__arrow_c_array__` across Python boundary
32pub(crate) fn call_arrow_c_array<'py>(
33    ob: &'py Bound<PyAny>,
34) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
35    if !ob.hasattr("__arrow_c_array__")? {
36        return Err(PyValueError::new_err(
37            "Expected an object with dunder __arrow_c_array__",
38        ));
39    }
40
41    let tuple = ob.getattr("__arrow_c_array__")?.call0()?;
42    if !tuple.is_instance_of::<PyTuple>() {
43        return Err(PyTypeError::new_err(
44            "Expected __arrow_c_array__ to return a tuple.",
45        ));
46    }
47
48    let schema_capsule = tuple.get_item(0)?.downcast_into()?;
49    let array_capsule = tuple.get_item(1)?.downcast_into()?;
50    Ok((schema_capsule, array_capsule))
51}
52
53pub(crate) fn import_array_pycapsules(
54    schema_capsule: &Bound<PyCapsule>,
55    array_capsule: &Bound<PyCapsule>,
56) -> PyResult<(arrow::datatypes::Field, Box<dyn Array>)> {
57    validate_pycapsule_name(schema_capsule, "arrow_schema")?;
58    validate_pycapsule_name(array_capsule, "arrow_array")?;
59
60    // # Safety
61    // schema_capsule holds a valid C ArrowSchema pointer, as defined by the Arrow PyCapsule
62    // Interface
63    // array_capsule holds a valid C ArrowArray pointer, as defined by the Arrow PyCapsule
64    // Interface
65    let (field, array) = unsafe {
66        let schema_ptr = schema_capsule.reference::<ArrowSchema>();
67        let array_ptr = std::ptr::replace(array_capsule.pointer() as _, ArrowArray::empty());
68
69        let field = ffi::import_field_from_c(schema_ptr).unwrap();
70        let array = ffi::import_array_from_c(array_ptr, field.dtype().clone()).unwrap();
71        (field, array)
72    };
73
74    Ok((field, array))
75}
76
77/// Import `__arrow_c_stream__` across Python boundary.
78fn call_arrow_c_stream<'py>(ob: &'py Bound<PyAny>) -> PyResult<Bound<'py, PyCapsule>> {
79    if !ob.hasattr("__arrow_c_stream__")? {
80        return Err(PyValueError::new_err(
81            "Expected an object with dunder __arrow_c_stream__",
82        ));
83    }
84
85    let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.downcast_into()?;
86    Ok(capsule)
87}
88
89pub(crate) fn import_stream_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<PySeries> {
90    validate_pycapsule_name(capsule, "arrow_array_stream")?;
91
92    // # Safety
93    // capsule holds a valid C ArrowArrayStream pointer, as defined by the Arrow PyCapsule
94    // Interface
95    let mut stream = unsafe {
96        // Takes ownership of the pointed to ArrowArrayStream
97        // This acts to move the data out of the capsule pointer, setting the release callback to NULL
98        let stream_ptr = Box::new(std::ptr::replace(
99            capsule.pointer() as _,
100            ArrowArrayStream::empty(),
101        ));
102        ArrowArrayStreamReader::try_new(stream_ptr)
103            .map_err(|err| PyValueError::new_err(err.to_string()))?
104    };
105
106    let mut produced_arrays: Vec<Box<dyn Array>> = vec![];
107    while let Some(array) = unsafe { stream.next() } {
108        produced_arrays.push(array.unwrap());
109    }
110
111    // Series::try_from fails for an empty vec of chunks
112    let s = if produced_arrays.is_empty() {
113        let polars_dt = DataType::from_arrow_field(stream.field());
114        Series::new_empty(stream.field().name.clone(), &polars_dt)
115    } else {
116        Series::try_from((stream.field(), produced_arrays)).unwrap()
117    };
118    Ok(PySeries::new(s))
119}
120#[pymethods]
121impl PySeries {
122    #[classmethod]
123    pub fn from_arrow_c_array(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
124        let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
125        let (field, array) = import_array_pycapsules(&schema_capsule, &array_capsule)?;
126        let s = Series::try_from((&field, array)).unwrap();
127        Ok(PySeries::new(s))
128    }
129
130    #[classmethod]
131    pub fn from_arrow_c_stream(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
132        let capsule = call_arrow_c_stream(ob)?;
133        import_stream_pycapsule(&capsule)
134    }
135}