polars_python/series/
import.rs1use arrow::array::{Array, PrimitiveArray};
2use arrow::ffi;
3use arrow::ffi::{ArrowArray, ArrowArrayStream, ArrowArrayStreamReader, ArrowSchema};
4use polars::prelude::*;
5use polars_ffi::version_0::SeriesExport;
6use pyo3::exceptions::{PyTypeError, PyValueError};
7use pyo3::prelude::*;
8use pyo3::pybacked::PyBackedBytes;
9use pyo3::types::{PyCapsule, PyTuple, PyType};
10
11use super::PySeries;
12use crate::error::PyPolarsErr;
13
14fn validate_pycapsule_name(capsule: &Bound<PyCapsule>, expected_name: &str) -> PyResult<()> {
16 let capsule_name = capsule.name()?;
17 if let Some(capsule_name) = capsule_name {
18 let capsule_name = capsule_name.to_str()?;
19 if capsule_name != expected_name {
20 return Err(PyValueError::new_err(format!(
21 "Expected name '{expected_name}' in PyCapsule, instead got '{capsule_name}'"
22 )));
23 }
24 } else {
25 return Err(PyValueError::new_err(
26 "Expected schema PyCapsule to have name set.",
27 ));
28 }
29
30 Ok(())
31}
32
33pub(crate) fn call_arrow_c_array<'py>(
35 ob: &Bound<'py, PyAny>,
36) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
37 if !ob.hasattr("__arrow_c_array__")? {
38 return Err(PyValueError::new_err(
39 "Expected an object with dunder __arrow_c_array__",
40 ));
41 }
42
43 let tuple = ob.getattr("__arrow_c_array__")?.call0()?;
44 if !tuple.is_instance_of::<PyTuple>() {
45 return Err(PyTypeError::new_err(
46 "Expected __arrow_c_array__ to return a tuple.",
47 ));
48 }
49
50 let schema_capsule = tuple.get_item(0)?.downcast_into()?;
51 let array_capsule = tuple.get_item(1)?.downcast_into()?;
52 Ok((schema_capsule, array_capsule))
53}
54
55pub(crate) fn import_array_pycapsules(
56 schema_capsule: &Bound<PyCapsule>,
57 array_capsule: &Bound<PyCapsule>,
58) -> PyResult<(arrow::datatypes::Field, Box<dyn Array>)> {
59 let field = import_schema_pycapsule(schema_capsule)?;
60
61 validate_pycapsule_name(array_capsule, "arrow_array")?;
62
63 unsafe {
67 let array_ptr = std::ptr::replace(array_capsule.pointer() as _, ArrowArray::empty());
68 let array = ffi::import_array_from_c(array_ptr, field.dtype().clone()).unwrap();
69
70 Ok((field, array))
71 }
72}
73
74pub(crate) fn import_schema_pycapsule(
75 schema_capsule: &Bound<PyCapsule>,
76) -> PyResult<arrow::datatypes::Field> {
77 validate_pycapsule_name(schema_capsule, "arrow_schema")?;
78
79 unsafe {
83 let schema_ptr = schema_capsule.reference::<ArrowSchema>();
84 let field = ffi::import_field_from_c(schema_ptr).unwrap();
85
86 Ok(field)
87 }
88}
89
90fn call_arrow_c_stream<'py>(ob: &Bound<'py, PyAny>) -> PyResult<Bound<'py, PyCapsule>> {
92 if !ob.hasattr("__arrow_c_stream__")? {
93 return Err(PyValueError::new_err(
94 "Expected an object with dunder __arrow_c_stream__",
95 ));
96 }
97
98 let capsule = ob.getattr("__arrow_c_stream__")?.call0()?.downcast_into()?;
99 Ok(capsule)
100}
101
102pub(crate) fn import_stream_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<PySeries> {
103 validate_pycapsule_name(capsule, "arrow_array_stream")?;
104
105 let mut stream = unsafe {
109 let stream_ptr = Box::new(std::ptr::replace(
112 capsule.pointer() as _,
113 ArrowArrayStream::empty(),
114 ));
115 ArrowArrayStreamReader::try_new(stream_ptr)
116 .map_err(|err| PyValueError::new_err(err.to_string()))?
117 };
118
119 let mut produced_arrays: Vec<Box<dyn Array>> = vec![];
120 while let Some(array) = unsafe { stream.next() } {
121 produced_arrays.push(array.unwrap());
122 }
123
124 let s = if produced_arrays.is_empty() {
126 let polars_dt = DataType::from_arrow_field(stream.field());
127 Series::new_empty(stream.field().name.clone(), &polars_dt)
128 } else {
129 Series::try_from((stream.field(), produced_arrays)).unwrap()
130 };
131 Ok(PySeries::new(s))
132}
133#[pymethods]
134impl PySeries {
135 #[classmethod]
136 pub fn from_arrow_c_array(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
137 let (schema_capsule, array_capsule) = call_arrow_c_array(ob)?;
138 let (field, array) = import_array_pycapsules(&schema_capsule, &array_capsule)?;
139 let s = Series::try_from((&field, array)).unwrap();
140 Ok(PySeries::new(s))
141 }
142
143 #[classmethod]
144 pub fn from_arrow_c_stream(_cls: &Bound<PyType>, ob: &Bound<'_, PyAny>) -> PyResult<Self> {
145 let capsule = call_arrow_c_stream(ob)?;
146 import_stream_pycapsule(&capsule)
147 }
148
149 #[classmethod]
150 pub unsafe fn _import(_cls: &Bound<PyType>, location: usize) -> PyResult<Self> {
155 let location = location as *mut SeriesExport;
156
157 let series = unsafe {
160 let export = location.read();
161 polars_ffi::version_0::import_series(export).map_err(PyPolarsErr::from)?
162 };
163 Ok(PySeries::from(series))
164 }
165
166 #[staticmethod]
167 pub fn _import_decimal_from_iceberg_binary_repr(
168 bytes_list: &Bound<PyAny>, precision: usize,
170 scale: usize,
171 ) -> PyResult<Self> {
172 let max_abs_decimal_value = 10_i128.pow(u32::try_from(precision).unwrap()) - 1;
177
178 let out: Vec<i128> = bytes_list
179 .try_iter()?
180 .map(|bytes| {
181 let be_bytes: Option<PyBackedBytes> = bytes?.extract()?;
182
183 let mut le_bytes: [u8; 16] = [0; _];
184
185 if let Some(be_bytes) = be_bytes.as_deref() {
186 if be_bytes.len() > le_bytes.len() {
187 return Err(PyValueError::new_err(format!(
188 "iceberg binary data for decimal exceeded 16 bytes: {}",
189 be_bytes.len()
190 )));
191 }
192
193 for (i, byte) in be_bytes.iter().rev().enumerate() {
194 le_bytes[i] = *byte;
195 }
196 }
197
198 let value = i128::from_le_bytes(le_bytes);
199
200 if value.abs() > max_abs_decimal_value {
201 return Err(PyValueError::new_err(format!(
202 "iceberg decoded value for decimal exceeded precision: \
203 value: {value}, precision: {precision}",
204 )));
205 }
206
207 Ok(value)
208 })
209 .collect::<PyResult<_>>()?;
210
211 Ok(PySeries::from(unsafe {
212 Series::from_chunks_and_dtype_unchecked(
213 PlSmallStr::EMPTY,
214 vec![PrimitiveArray::<i128>::from_vec(out).boxed()],
215 &DataType::Decimal(precision, scale),
216 )
217 }))
218 }
219}