polars_python/series/
import.rs1use arrow::array::Array;
2use arrow::ffi;
3use arrow::ffi::{ArrowArray, ArrowArrayStream, ArrowArrayStreamReader, ArrowSchema};
4use polars::prelude::*;
5use polars_ffi::version_0::SeriesExport;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyTuple, PyType};
9
10use super::PySeries;
11use crate::error::PyPolarsErr;
12
13fn validate_pycapsule_name(capsule: &Bound<PyCapsule>, expected_name: &str) -> PyResult<()> {
15 let capsule_name = capsule.name()?;
16 if let Some(capsule_name) = capsule_name {
17 let capsule_name = capsule_name.to_str()?;
18 if capsule_name != expected_name {
19 return Err(PyValueError::new_err(format!(
20 "Expected name '{}' in PyCapsule, instead got '{}'",
21 expected_name, capsule_name
22 )));
23 }
24 } else {
25 return Err(PyValueError::new_err(
26 "Expected schema PyCapsule to have name set.",
27 ));
28 }
29
30 Ok(())
31}
32
33pub(crate) fn call_arrow_c_array<'py>(
35 ob: &Bound<'py, PyAny>,
36) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
37 if !ob.hasattr("__arrow_c_array__")? {
38 return Err(PyValueError::new_err(
39 "Expected an object with dunder __arrow_c_array__",
40 ));
41 }
42
43 let tuple = ob.getattr("__arrow_c_array__")?.call0()?;
44 if !tuple.is_instance_of::<PyTuple>() {
45 return Err(PyTypeError::new_err(
46 "Expected __arrow_c_array__ to return a tuple.",
47 ));
48 }
49
50 let schema_capsule = tuple.get_item(0)?.downcast_into()?;
51 let array_capsule = tuple.get_item(1)?.downcast_into()?;
52 Ok((schema_capsule, array_capsule))
53}
54
55pub(crate) fn import_array_pycapsules(
56 schema_capsule: &Bound<PyCapsule>,
57 array_capsule: &Bound<PyCapsule>,
58) -> PyResult<(arrow::datatypes::Field, Box<dyn Array>)> {
59 validate_pycapsule_name(schema_capsule, "arrow_schema")?;
60 validate_pycapsule_name(array_capsule, "arrow_array")?;
61
62 let (field, array) = unsafe {
68 let schema_ptr = schema_capsule.reference::<ArrowSchema>();
69 let array_ptr = std::ptr::replace(array_capsule.pointer() as _, ArrowArray::empty());
70
71 let field = ffi::import_field_from_c(schema_ptr).unwrap();
72 let array = ffi::import_array_from_c(array_ptr, field.dtype().clone()).unwrap();
73 (field, array)
74 };
75
76 Ok((field, array))
77}
78
79fn call_arrow_c_stream<'py>(ob: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyCapsule>> {
81 if !ob.hasattr("__arrow_c_stream__")? {
82 return Err(PyValueError::new_err(
83 "Expected an object with dunder __arrow_c_stream__",
84 ));
85 }
86
87 let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.downcast_into()?;
88 Ok(capsule)
89}
90
91pub(crate) fn import_stream_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<PySeries> {
92 validate_pycapsule_name(capsule, "arrow_array_stream")?;
93
94 let mut stream = unsafe {
98 let stream_ptr = Box::new(std::ptr::replace(
101 capsule.pointer() as _,
102 ArrowArrayStream::empty(),
103 ));
104 ArrowArrayStreamReader::try_new(stream_ptr)
105 .map_err(|err| PyValueError::new_err(err.to_string()))?
106 };
107
108 let mut produced_arrays: Vec<Box<dyn Array>> = vec![];
109 while let Some(array) = unsafe { stream.next() } {
110 produced_arrays.push(array.unwrap());
111 }
112
113 let s = if produced_arrays.is_empty() {
115 let polars_dt = DataType::from_arrow_field(stream.field());
116 Series::new_empty(stream.field().name.clone(), &polars_dt)
117 } else {
118 Series::try_from((stream.field(), produced_arrays)).unwrap()
119 };
120 Ok(PySeries::new(s))
121}
122#[pymethods]
123impl PySeries {
124 #[classmethod]
125 pub fn from_arrow_c_array(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
126 let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
127 let (field, array) = import_array_pycapsules(&schema_capsule, &array_capsule)?;
128 let s = Series::try_from((&field, array)).unwrap();
129 Ok(PySeries::new(s))
130 }
131
132 #[classmethod]
133 pub fn from_arrow_c_stream(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
134 let capsule = call_arrow_c_stream(ob)?;
135 import_stream_pycapsule(&capsule)
136 }
137
138 #[classmethod]
139 pub unsafe fn _import(_cls: &Bound<PyType>, location: usize) -> PyResult<Self> {
144 let location = location as *mut SeriesExport;
145
146 let series = unsafe {
149 let export = location.read();
150 polars_ffi::version_0::import_series(export).map_err(PyPolarsErr::from)?
151 };
152 Ok(PySeries { series })
153 }
154}