1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use polars::io::mmap::MmapBytesReader;
13use polars_error::polars_err;
14use polars_io::cloud::CloudOptions;
15use polars_utils::create_file;
16use polars_utils::mmap::MemSlice;
17use pyo3::exceptions::PyTypeError;
18use pyo3::prelude::*;
19use pyo3::types::{PyBytes, PyString, PyStringMethods};
20use pyo3::IntoPyObjectExt;
21
22use crate::error::PyPolarsErr;
23use crate::prelude::resolve_homedir;
24
25pub struct PyFileLikeObject {
26 inner: PyObject,
27}
28
29impl Clone for PyFileLikeObject {
30 fn clone(&self) -> Self {
31 Python::with_gil(|py| Self {
32 inner: self.inner.clone_ref(py),
33 })
34 }
35}
36
37impl PyFileLikeObject {
39 pub fn new(object: PyObject) -> Self {
43 PyFileLikeObject { inner: object }
44 }
45
46 pub fn to_memslice(&self) -> MemSlice {
47 Python::with_gil(|py| {
48 let bytes = self
49 .inner
50 .call_method(py, "read", (), None)
51 .expect("no read method found");
52
53 if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
54 return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
55 }
56
57 if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
58 return match b.to_cow().expect("PyString is not valid UTF-8") {
59 Cow::Borrowed(v) => {
60 MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
61 },
62 Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
63 };
64 }
65
66 panic!("Expecting to be able to downcast into bytes from read result.");
67 })
68 }
69
70 pub fn ensure_requirements(
74 object: &Bound<PyAny>,
75 read: bool,
76 write: bool,
77 seek: bool,
78 ) -> PyResult<()> {
79 if read && object.getattr("read").is_err() {
80 return Err(PyErr::new::<PyTypeError, _>(
81 "Object does not have a .read() method.",
82 ));
83 }
84
85 if seek && object.getattr("seek").is_err() {
86 return Err(PyErr::new::<PyTypeError, _>(
87 "Object does not have a .seek() method.",
88 ));
89 }
90
91 if write && object.getattr("write").is_err() {
92 return Err(PyErr::new::<PyTypeError, _>(
93 "Object does not have a .write() method.",
94 ));
95 }
96
97 Ok(())
98 }
99}
100
101fn pyerr_to_io_err(e: PyErr) -> io::Error {
103 Python::with_gil(|py| {
104 let e_as_object: PyObject = e.into_py_any(py).unwrap();
105
106 match e_as_object.call_method(py, "__str__", (), None) {
107 Ok(repr) => match repr.extract::<String>(py) {
108 Ok(s) => io::Error::new(io::ErrorKind::Other, s),
109 Err(_e) => io::Error::new(io::ErrorKind::Other, "An unknown error has occurred"),
110 },
111 Err(_) => io::Error::new(io::ErrorKind::Other, "Err doesn't have __str__"),
112 }
113 })
114}
115
116impl Read for PyFileLikeObject {
117 fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
118 Python::with_gil(|py| {
119 let bytes = self
120 .inner
121 .call_method(py, "read", (buf.len(),), None)
122 .map_err(pyerr_to_io_err)?;
123
124 let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
125
126 if let Ok(bytes) = opt_bytes {
127 buf.write_all(bytes.as_bytes())?;
128
129 bytes.len().map_err(pyerr_to_io_err)
130 } else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
131 let s = s.to_cow().map_err(pyerr_to_io_err)?;
132 buf.write_all(s.as_bytes())?;
133 Ok(s.len())
134 } else {
135 Err(io::Error::new(
136 ErrorKind::InvalidInput,
137 polars_err!(InvalidOperation: "could not read from input"),
138 ))
139 }
140 })
141 }
142}
143
144impl Write for PyFileLikeObject {
145 fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
146 Python::with_gil(|py| {
147 let pybytes = PyBytes::new(py, buf);
148
149 let number_bytes_written = self
150 .inner
151 .call_method(py, "write", (pybytes,), None)
152 .map_err(pyerr_to_io_err)?;
153
154 number_bytes_written.extract(py).map_err(pyerr_to_io_err)
155 })
156 }
157
158 fn flush(&mut self) -> Result<(), io::Error> {
159 Python::with_gil(|py| {
160 self.inner
161 .call_method(py, "flush", (), None)
162 .map_err(pyerr_to_io_err)?;
163
164 Ok(())
165 })
166 }
167}
168
169impl Seek for PyFileLikeObject {
170 fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
171 Python::with_gil(|py| {
172 let (whence, offset) = match pos {
173 SeekFrom::Start(i) => (0, i as i64),
174 SeekFrom::Current(i) => (1, i),
175 SeekFrom::End(i) => (2, i),
176 };
177
178 let new_position = self
179 .inner
180 .call_method(py, "seek", (offset, whence), None)
181 .map_err(pyerr_to_io_err)?;
182
183 new_position.extract(py).map_err(pyerr_to_io_err)
184 })
185 }
186}
187
188pub trait FileLike: Read + Write + Seek + Sync + Send {}
189
190impl FileLike for File {}
191impl FileLike for PyFileLikeObject {}
192impl MmapBytesReader for PyFileLikeObject {}
193
194pub enum EitherRustPythonFile {
195 Py(PyFileLikeObject),
196 Rust(File),
197}
198
199impl EitherRustPythonFile {
200 pub fn into_dyn(self) -> Box<dyn FileLike> {
201 match self {
202 EitherRustPythonFile::Py(f) => Box::new(f),
203 EitherRustPythonFile::Rust(f) => Box::new(f),
204 }
205 }
206
207 fn into_scan_source_input(self) -> PythonScanSourceInput {
208 match self {
209 EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
210 EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
211 }
212 }
213
214 pub fn into_dyn_writeable(self) -> Box<dyn Write + Send> {
215 match self {
216 EitherRustPythonFile::Py(f) => Box::new(f),
217 EitherRustPythonFile::Rust(f) => Box::new(f),
218 }
219 }
220}
221
222pub enum PythonScanSourceInput {
223 Buffer(MemSlice),
224 Path(PathBuf),
225 File(File),
226}
227
228fn try_get_pyfile(
229 py: Python,
230 py_f: Bound<'_, PyAny>,
231 write: bool,
232) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
233 let io = py.import("io")?;
234 let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
235 let encoding = py_f.getattr("encoding")?;
236 let encoding = encoding.extract::<Cow<str>>()?;
237 Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
238 };
239
240 #[cfg(target_family = "unix")]
241 if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
242 || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
243 || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
244 || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
245 || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
246 || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
247 && is_utf8_encoding(&py_f)?))
248 && if write {
249 py_f.call_method0("flush").is_ok()
251 } else {
252 py_f.call_method1("seek", (0, 1)).is_ok()
254 })
255 .then(|| {
256 py_f.getattr("fileno")
257 .and_then(|fileno| fileno.call0())
258 .and_then(|fileno| fileno.extract::<libc::c_int>())
259 .ok()
260 })
261 .flatten()
262 .map(|fileno| unsafe {
263 libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
280 })
281 .filter(|fileno| *fileno != -1)
282 .map(|fileno| fileno as RawFd)
283 {
284 return Ok((
285 EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd) }),
286 fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
289 ));
290 }
291
292 let py_f = if py_f
295 .is_instance(&io.getattr("TextIOWrapper").unwrap())
296 .unwrap_or_default()
297 {
298 if !is_utf8_encoding(&py_f)? {
299 return Err(PyPolarsErr::from(
300 polars_err!(InvalidOperation: "file encoding is not UTF-8"),
301 )
302 .into());
303 }
304 if write {
307 py_f.call_method0("flush")?;
308 } else {
309 py_f.call_method1("seek", (0, 1))?;
310 }
311 py_f.getattr("buffer")?
312 } else {
313 py_f
314 };
315 PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
316 let f = PyFileLikeObject::new(py_f.unbind());
317 Ok((EitherRustPythonFile::Py(f), None))
318}
319
320pub fn get_python_scan_source_input(
321 py_f: PyObject,
322 write: bool,
323) -> PyResult<PythonScanSourceInput> {
324 Python::with_gil(|py| {
325 let py_f = py_f.into_bound(py);
326
327 let py_f = read_if_bytesio(py_f);
332
333 if let Ok(b) = py_f.downcast::<PyBytes>() {
335 return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
336 b.as_bytes(),
337 Arc::new(b.clone().unbind()),
339 )));
340 }
341
342 if let Ok(s) = py_f.extract::<Cow<str>>() {
343 let file_path = resolve_homedir(&&*s);
344 Ok(PythonScanSourceInput::Path(file_path))
345 } else {
346 Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
347 }
348 })
349}
350
351fn get_either_buffer_or_path(
352 py_f: PyObject,
353 write: bool,
354) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
355 Python::with_gil(|py| {
356 let py_f = py_f.into_bound(py);
357 if let Ok(s) = py_f.extract::<Cow<str>>() {
358 let file_path = resolve_homedir(&&*s);
359 let f = if write {
360 create_file(&file_path).map_err(PyPolarsErr::from)?
361 } else {
362 polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
363 };
364 Ok((EitherRustPythonFile::Rust(f), Some(file_path)))
365 } else {
366 try_get_pyfile(py, py_f, write)
367 }
368 })
369}
370
371pub fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
375 Ok(get_either_buffer_or_path(py_f, write)?.0)
376}
377
378pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
379 Ok(get_either_file(f, truncate)?.into_dyn())
380}
381
382fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
385 let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
386 if py_f.is_instance(&bytes_io).unwrap() {
387 let Ok(bytes) = py_f.call_method0("getvalue") else {
390 return py_f;
391 };
392 return bytes;
393 }
394 py_f
395}
396
397pub fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
399 get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
400}
401
402pub fn get_mmap_bytes_reader_and_path(
403 py_f: &Bound<PyAny>,
404) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
405 let py_f = read_if_bytesio(py_f.clone());
406
407 if let Ok(bytes) = py_f.downcast::<PyBytes>() {
409 Ok((
410 Box::new(Cursor::new(MemSlice::from_arc(
411 bytes.as_bytes(),
412 Arc::new(py_f.clone().unbind()),
413 ))),
414 None,
415 ))
416 }
417 else {
419 match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
420 (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
421 (EitherRustPythonFile::Py(f), path) => {
422 Ok((Box::new(Cursor::new(f.to_memslice())), path))
423 },
424 }
425 }
426}
427
428pub fn try_get_writeable(
429 py_f: PyObject,
430 cloud_options: Option<&CloudOptions>,
431) -> PyResult<Box<dyn Write + Send>> {
432 Python::with_gil(|py| {
433 let py_f = py_f.into_bound(py);
434
435 if let Ok(s) = py_f.extract::<Cow<str>>() {
436 polars::prelude::file::try_get_writeable(&s, cloud_options)
437 .map_err(PyPolarsErr::from)
438 .map_err(|e| e.into())
439 } else {
440 Ok(try_get_pyfile(py, py_f, true)?.0.into_dyn_writeable())
441 }
442 })
443}