polars_python/
file.rs

1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use polars::io::mmap::MmapBytesReader;
13use polars_error::polars_err;
14use polars_io::cloud::CloudOptions;
15use polars_utils::create_file;
16use polars_utils::mmap::MemSlice;
17use pyo3::exceptions::PyTypeError;
18use pyo3::prelude::*;
19use pyo3::types::{PyBytes, PyString, PyStringMethods};
20use pyo3::IntoPyObjectExt;
21
22use crate::error::PyPolarsErr;
23use crate::prelude::resolve_homedir;
24
25pub struct PyFileLikeObject {
26    inner: PyObject,
27}
28
29impl Clone for PyFileLikeObject {
30    fn clone(&self) -> Self {
31        Python::with_gil(|py| Self {
32            inner: self.inner.clone_ref(py),
33        })
34    }
35}
36
37/// Wraps a `PyObject`, and implements read, seek, and write for it.
38impl PyFileLikeObject {
39    /// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
40    /// To assert the object has the required methods,
41    /// instantiate it with `PyFileLikeObject::require`
42    pub fn new(object: PyObject) -> Self {
43        PyFileLikeObject { inner: object }
44    }
45
46    pub fn to_memslice(&self) -> MemSlice {
47        Python::with_gil(|py| {
48            let bytes = self
49                .inner
50                .call_method(py, "read", (), None)
51                .expect("no read method found");
52
53            if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
54                return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
55            }
56
57            if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
58                return match b.to_cow().expect("PyString is not valid UTF-8") {
59                    Cow::Borrowed(v) => {
60                        MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
61                    },
62                    Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
63                };
64            }
65
66            panic!("Expecting to be able to downcast into bytes from read result.");
67        })
68    }
69
70    /// Validates that the underlying
71    /// python object has a `read`, `write`, and `seek` methods in respect to parameters.
72    /// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
73    pub fn ensure_requirements(
74        object: &Bound<PyAny>,
75        read: bool,
76        write: bool,
77        seek: bool,
78    ) -> PyResult<()> {
79        if read && object.getattr("read").is_err() {
80            return Err(PyErr::new::<PyTypeError, _>(
81                "Object does not have a .read() method.",
82            ));
83        }
84
85        if seek && object.getattr("seek").is_err() {
86            return Err(PyErr::new::<PyTypeError, _>(
87                "Object does not have a .seek() method.",
88            ));
89        }
90
91        if write && object.getattr("write").is_err() {
92            return Err(PyErr::new::<PyTypeError, _>(
93                "Object does not have a .write() method.",
94            ));
95        }
96
97        Ok(())
98    }
99}
100
101/// Extracts a string repr from, and returns an IO error to send back to rust.
102fn pyerr_to_io_err(e: PyErr) -> io::Error {
103    Python::with_gil(|py| {
104        let e_as_object: PyObject = e.into_py_any(py).unwrap();
105
106        match e_as_object.call_method(py, "__str__", (), None) {
107            Ok(repr) => match repr.extract::<String>(py) {
108                Ok(s) => io::Error::new(io::ErrorKind::Other, s),
109                Err(_e) => io::Error::new(io::ErrorKind::Other, "An unknown error has occurred"),
110            },
111            Err(_) => io::Error::new(io::ErrorKind::Other, "Err doesn't have __str__"),
112        }
113    })
114}
115
116impl Read for PyFileLikeObject {
117    fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
118        Python::with_gil(|py| {
119            let bytes = self
120                .inner
121                .call_method(py, "read", (buf.len(),), None)
122                .map_err(pyerr_to_io_err)?;
123
124            let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
125
126            if let Ok(bytes) = opt_bytes {
127                buf.write_all(bytes.as_bytes())?;
128
129                bytes.len().map_err(pyerr_to_io_err)
130            } else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
131                let s = s.to_cow().map_err(pyerr_to_io_err)?;
132                buf.write_all(s.as_bytes())?;
133                Ok(s.len())
134            } else {
135                Err(io::Error::new(
136                    ErrorKind::InvalidInput,
137                    polars_err!(InvalidOperation: "could not read from input"),
138                ))
139            }
140        })
141    }
142}
143
144impl Write for PyFileLikeObject {
145    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
146        Python::with_gil(|py| {
147            let pybytes = PyBytes::new(py, buf);
148
149            let number_bytes_written = self
150                .inner
151                .call_method(py, "write", (pybytes,), None)
152                .map_err(pyerr_to_io_err)?;
153
154            number_bytes_written.extract(py).map_err(pyerr_to_io_err)
155        })
156    }
157
158    fn flush(&mut self) -> Result<(), io::Error> {
159        Python::with_gil(|py| {
160            self.inner
161                .call_method(py, "flush", (), None)
162                .map_err(pyerr_to_io_err)?;
163
164            Ok(())
165        })
166    }
167}
168
169impl Seek for PyFileLikeObject {
170    fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
171        Python::with_gil(|py| {
172            let (whence, offset) = match pos {
173                SeekFrom::Start(i) => (0, i as i64),
174                SeekFrom::Current(i) => (1, i),
175                SeekFrom::End(i) => (2, i),
176            };
177
178            let new_position = self
179                .inner
180                .call_method(py, "seek", (offset, whence), None)
181                .map_err(pyerr_to_io_err)?;
182
183            new_position.extract(py).map_err(pyerr_to_io_err)
184        })
185    }
186}
187
188pub trait FileLike: Read + Write + Seek + Sync + Send {}
189
190impl FileLike for File {}
191impl FileLike for PyFileLikeObject {}
192impl MmapBytesReader for PyFileLikeObject {}
193
194pub enum EitherRustPythonFile {
195    Py(PyFileLikeObject),
196    Rust(File),
197}
198
199impl EitherRustPythonFile {
200    pub fn into_dyn(self) -> Box<dyn FileLike> {
201        match self {
202            EitherRustPythonFile::Py(f) => Box::new(f),
203            EitherRustPythonFile::Rust(f) => Box::new(f),
204        }
205    }
206
207    fn into_scan_source_input(self) -> PythonScanSourceInput {
208        match self {
209            EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
210            EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
211        }
212    }
213
214    pub fn into_dyn_writeable(self) -> Box<dyn Write + Send> {
215        match self {
216            EitherRustPythonFile::Py(f) => Box::new(f),
217            EitherRustPythonFile::Rust(f) => Box::new(f),
218        }
219    }
220}
221
222pub enum PythonScanSourceInput {
223    Buffer(MemSlice),
224    Path(PathBuf),
225    File(File),
226}
227
228fn try_get_pyfile(
229    py: Python,
230    py_f: Bound<'_, PyAny>,
231    write: bool,
232) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
233    let io = py.import("io")?;
234    let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
235        let encoding = py_f.getattr("encoding")?;
236        let encoding = encoding.extract::<Cow<str>>()?;
237        Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
238    };
239
240    #[cfg(target_family = "unix")]
241    if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
242        || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
243            || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
244            || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
245            || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
246            || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
247                && is_utf8_encoding(&py_f)?))
248            && if write {
249                // invalidate read buffer
250                py_f.call_method0("flush").is_ok()
251            } else {
252                // flush write buffer
253                py_f.call_method1("seek", (0, 1)).is_ok()
254            })
255    .then(|| {
256        py_f.getattr("fileno")
257            .and_then(|fileno| fileno.call0())
258            .and_then(|fileno| fileno.extract::<libc::c_int>())
259            .ok()
260    })
261    .flatten()
262    .map(|fileno| unsafe {
263        // `File::from_raw_fd()` takes the ownership of the file descriptor.
264        // When the File is dropped, it closes the file descriptor.
265        // This is undesired - the Python file object will become invalid.
266        // Therefore, we duplicate the file descriptor here.
267        // Closing the duplicated file descriptor will not close
268        // the original file descriptor;
269        // and the status, e.g. stream position, is still shared with
270        // the original file descriptor.
271        // We use `F_DUPFD_CLOEXEC` here instead of `dup()`
272        // because it also sets the `O_CLOEXEC` flag on the duplicated file descriptor,
273        // which `dup()` clears.
274        // `open()` in both Rust and Python automatically set `O_CLOEXEC` flag;
275        // it prevents leaking file descriptors across processes,
276        // and we want to be consistent with them.
277        // `F_DUPFD_CLOEXEC` is defined in POSIX.1-2008
278        // and is present on all alive UNIX(-like) systems.
279        libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
280    })
281    .filter(|fileno| *fileno != -1)
282    .map(|fileno| fileno as RawFd)
283    {
284        return Ok((
285            EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd) }),
286            // This works on Linux and BSD with procfs mounted,
287            // otherwise it fails silently.
288            fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
289        ));
290    }
291
292    // Unwrap TextIOWrapper
293    // Allow subclasses to allow things like pytest.capture.CaptureIO
294    let py_f = if py_f
295        .is_instance(&io.getattr("TextIOWrapper").unwrap())
296        .unwrap_or_default()
297    {
298        if !is_utf8_encoding(&py_f)? {
299            return Err(PyPolarsErr::from(
300                polars_err!(InvalidOperation: "file encoding is not UTF-8"),
301            )
302            .into());
303        }
304        // XXX: we have to clear buffer here.
305        // Is there a better solution?
306        if write {
307            py_f.call_method0("flush")?;
308        } else {
309            py_f.call_method1("seek", (0, 1))?;
310        }
311        py_f.getattr("buffer")?
312    } else {
313        py_f
314    };
315    PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
316    let f = PyFileLikeObject::new(py_f.unbind());
317    Ok((EitherRustPythonFile::Py(f), None))
318}
319
320pub fn get_python_scan_source_input(
321    py_f: PyObject,
322    write: bool,
323) -> PyResult<PythonScanSourceInput> {
324    Python::with_gil(|py| {
325        let py_f = py_f.into_bound(py);
326
327        // CPython has some internal tricks that means much of the time
328        // BytesIO.getvalue() involves no memory copying, unlike
329        // BytesIO.read(). So we want to handle BytesIO specially in order
330        // to save memory.
331        let py_f = read_if_bytesio(py_f);
332
333        // If the pyobject is a `bytes` class
334        if let Ok(b) = py_f.downcast::<PyBytes>() {
335            return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
336                b.as_bytes(),
337                // We want to specifically keep alive the PyBytes object.
338                Arc::new(b.clone().unbind()),
339            )));
340        }
341
342        if let Ok(s) = py_f.extract::<Cow<str>>() {
343            let file_path = resolve_homedir(&&*s);
344            Ok(PythonScanSourceInput::Path(file_path))
345        } else {
346            Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
347        }
348    })
349}
350
351fn get_either_buffer_or_path(
352    py_f: PyObject,
353    write: bool,
354) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
355    Python::with_gil(|py| {
356        let py_f = py_f.into_bound(py);
357        if let Ok(s) = py_f.extract::<Cow<str>>() {
358            let file_path = resolve_homedir(&&*s);
359            let f = if write {
360                create_file(&file_path).map_err(PyPolarsErr::from)?
361            } else {
362                polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
363            };
364            Ok((EitherRustPythonFile::Rust(f), Some(file_path)))
365        } else {
366            try_get_pyfile(py, py_f, write)
367        }
368    })
369}
370
371///
372/// # Arguments
373/// * `write` - open for writing; will truncate existing file and create new file if not.
374pub fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
375    Ok(get_either_buffer_or_path(py_f, write)?.0)
376}
377
378pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
379    Ok(get_either_file(f, truncate)?.into_dyn())
380}
381
382/// If the give file-like is a BytesIO, read its contents in a memory-efficient
383/// way.
384fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
385    let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
386    if py_f.is_instance(&bytes_io).unwrap() {
387        // Note that BytesIO has some memory optimizations ensuring that much of
388        // the time getvalue() doesn't need to copy the underlying data:
389        let Ok(bytes) = py_f.call_method0("getvalue") else {
390            return py_f;
391        };
392        return bytes;
393    }
394    py_f
395}
396
397/// Create reader from PyBytes or a file-like object.
398pub fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
399    get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
400}
401
402pub fn get_mmap_bytes_reader_and_path(
403    py_f: &Bound<PyAny>,
404) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
405    let py_f = read_if_bytesio(py_f.clone());
406
407    // bytes object
408    if let Ok(bytes) = py_f.downcast::<PyBytes>() {
409        Ok((
410            Box::new(Cursor::new(MemSlice::from_arc(
411                bytes.as_bytes(),
412                Arc::new(py_f.clone().unbind()),
413            ))),
414            None,
415        ))
416    }
417    // string so read file
418    else {
419        match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
420            (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
421            (EitherRustPythonFile::Py(f), path) => {
422                Ok((Box::new(Cursor::new(f.to_memslice())), path))
423            },
424        }
425    }
426}
427
428pub fn try_get_writeable(
429    py_f: PyObject,
430    cloud_options: Option<&CloudOptions>,
431) -> PyResult<Box<dyn Write + Send>> {
432    Python::with_gil(|py| {
433        let py_f = py_f.into_bound(py);
434
435        if let Ok(s) = py_f.extract::<Cow<str>>() {
436            polars::prelude::file::try_get_writeable(&s, cloud_options)
437                .map_err(PyPolarsErr::from)
438                .map_err(|e| e.into())
439        } else {
440            Ok(try_get_pyfile(py, py_f, true)?.0.into_dyn_writeable())
441        }
442    })
443}