polars_python/series/
import.rs

1use arrow::array::Array;
2use arrow::ffi;
3use arrow::ffi::{ArrowArray, ArrowArrayStream, ArrowArrayStreamReader, ArrowSchema};
4use polars::prelude::*;
5use polars_ffi::version_0::SeriesExport;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyTuple, PyType};
9
10use super::PySeries;
11use crate::error::PyPolarsErr;
12
13/// Validate PyCapsule has provided name
14fn validate_pycapsule_name(capsule: &Bound<PyCapsule>, expected_name: &str) -> PyResult<()> {
15    let capsule_name = capsule.name()?;
16    if let Some(capsule_name) = capsule_name {
17        let capsule_name = capsule_name.to_str()?;
18        if capsule_name != expected_name {
19            return Err(PyValueError::new_err(format!(
20                "Expected name '{expected_name}' in PyCapsule, instead got '{capsule_name}'"
21            )));
22        }
23    } else {
24        return Err(PyValueError::new_err(
25            "Expected schema PyCapsule to have name set.",
26        ));
27    }
28
29    Ok(())
30}
31
32/// Import `__arrow_c_array__` across Python boundary
33pub(crate) fn call_arrow_c_array<'py>(
34    ob: &Bound<'py, PyAny>,
35) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
36    if !ob.hasattr("__arrow_c_array__")? {
37        return Err(PyValueError::new_err(
38            "Expected an object with dunder __arrow_c_array__",
39        ));
40    }
41
42    let tuple = ob.getattr("__arrow_c_array__")?.call0()?;
43    if !tuple.is_instance_of::<PyTuple>() {
44        return Err(PyTypeError::new_err(
45            "Expected __arrow_c_array__ to return a tuple.",
46        ));
47    }
48
49    let schema_capsule = tuple.get_item(0)?.downcast_into()?;
50    let array_capsule = tuple.get_item(1)?.downcast_into()?;
51    Ok((schema_capsule, array_capsule))
52}
53
54pub(crate) fn import_array_pycapsules(
55    schema_capsule: &Bound<PyCapsule>,
56    array_capsule: &Bound<PyCapsule>,
57) -> PyResult<(arrow::datatypes::Field, Box<dyn Array>)> {
58    let field = import_schema_pycapsule(schema_capsule)?;
59
60    validate_pycapsule_name(array_capsule, "arrow_array")?;
61
62    // # Safety
63    // array_capsule holds a valid C ArrowArray pointer, as defined by the Arrow PyCapsule
64    // Interface
65    unsafe {
66        let array_ptr = std::ptr::replace(array_capsule.pointer() as _, ArrowArray::empty());
67        let array = ffi::import_array_from_c(array_ptr, field.dtype().clone()).unwrap();
68
69        Ok((field, array))
70    }
71}
72
73pub(crate) fn import_schema_pycapsule(
74    schema_capsule: &Bound<PyCapsule>,
75) -> PyResult<arrow::datatypes::Field> {
76    validate_pycapsule_name(schema_capsule, "arrow_schema")?;
77
78    // # Safety
79    // schema_capsule holds a valid C ArrowSchema pointer, as defined by the Arrow PyCapsule
80    // Interface
81    unsafe {
82        let schema_ptr = schema_capsule.reference::<ArrowSchema>();
83        let field = ffi::import_field_from_c(schema_ptr).unwrap();
84
85        Ok(field)
86    }
87}
88
89/// Import `__arrow_c_stream__` across Python boundary.
90fn call_arrow_c_stream<'py>(ob: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyCapsule>> {
91    if !ob.hasattr("__arrow_c_stream__")? {
92        return Err(PyValueError::new_err(
93            "Expected an object with dunder __arrow_c_stream__",
94        ));
95    }
96
97    let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.downcast_into()?;
98    Ok(capsule)
99}
100
101pub(crate) fn import_stream_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<PySeries> {
102    validate_pycapsule_name(capsule, "arrow_array_stream")?;
103
104    // # Safety
105    // capsule holds a valid C ArrowArrayStream pointer, as defined by the Arrow PyCapsule
106    // Interface
107    let mut stream = unsafe {
108        // Takes ownership of the pointed to ArrowArrayStream
109        // This acts to move the data out of the capsule pointer, setting the release callback to NULL
110        let stream_ptr = Box::new(std::ptr::replace(
111            capsule.pointer() as _,
112            ArrowArrayStream::empty(),
113        ));
114        ArrowArrayStreamReader::try_new(stream_ptr)
115            .map_err(|err| PyValueError::new_err(err.to_string()))?
116    };
117
118    let mut produced_arrays: Vec<Box<dyn Array>> = vec![];
119    while let Some(array) = unsafe { stream.next() } {
120        produced_arrays.push(array.unwrap());
121    }
122
123    // Series::try_from fails for an empty vec of chunks
124    let s = if produced_arrays.is_empty() {
125        let polars_dt = DataType::from_arrow_field(stream.field());
126        Series::new_empty(stream.field().name.clone(), &polars_dt)
127    } else {
128        Series::try_from((stream.field(), produced_arrays)).unwrap()
129    };
130    Ok(PySeries::new(s))
131}
132#[pymethods]
133impl PySeries {
134    #[classmethod]
135    pub fn from_arrow_c_array(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
136        let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
137        let (field, array) = import_array_pycapsules(&schema_capsule, &array_capsule)?;
138        let s = Series::try_from((&field, array)).unwrap();
139        Ok(PySeries::new(s))
140    }
141
142    #[classmethod]
143    pub fn from_arrow_c_stream(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
144        let capsule = call_arrow_c_stream(ob)?;
145        import_stream_pycapsule(&capsule)
146    }
147
148    #[classmethod]
149    /// Import a series via polars-ffi
150    /// Takes ownership of the [`SeriesExport`] at [`location`]
151    /// # Safety
152    /// [`location`] should be the address of an allocated and initialized [`SeriesExport`]
153    pub unsafe fn _import(_cls: &Bound<PyType>, location: usize) -> PyResult<Self> {
154        let location = location as *mut SeriesExport;
155
156        // # Safety
157        // `location` should be valid for reading
158        let series = unsafe {
159            let export = location.read();
160            polars_ffi::version_0::import_series(export).map_err(PyPolarsErr::from)?
161        };
162        Ok(PySeries::from(series))
163    }
164}