polars_python/
file.rs

1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use polars::io::mmap::MmapBytesReader;
13use polars::prelude::file::DynWriteable;
14use polars::prelude::sync_on_close::SyncOnCloseType;
15use polars_error::polars_err;
16use polars_utils::create_file;
17use polars_utils::file::{ClosableFile, WriteClose};
18use polars_utils::mmap::MemSlice;
19use pyo3::IntoPyObjectExt;
20use pyo3::exceptions::PyTypeError;
21use pyo3::prelude::*;
22use pyo3::types::{PyBytes, PyString, PyStringMethods};
23
24use crate::error::PyPolarsErr;
25use crate::prelude::resolve_homedir;
26
27pub(crate) struct PyFileLikeObject {
28    inner: PyObject,
29    /// The object expects a string instead of a bytes for `write`.
30    expects_str: bool,
31    /// The object has a flush method.
32    has_flush: bool,
33}
34
35impl WriteClose for PyFileLikeObject {}
36impl DynWriteable for PyFileLikeObject {
37    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
38        self as _
39    }
40    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
41        self as _
42    }
43    fn close(self: Box<Self>) -> io::Result<()> {
44        Ok(())
45    }
46    fn sync_on_close(&mut self, _sync_on_close: SyncOnCloseType) -> io::Result<()> {
47        Ok(())
48    }
49}
50
51impl Clone for PyFileLikeObject {
52    fn clone(&self) -> Self {
53        Python::with_gil(|py| Self {
54            inner: self.inner.clone_ref(py),
55            expects_str: self.expects_str,
56            has_flush: self.has_flush,
57        })
58    }
59}
60
61/// Wraps a `PyObject`, and implements read, seek, and write for it.
62impl PyFileLikeObject {
63    /// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
64    /// To assert the object has the required methods,
65    /// instantiate it with `PyFileLikeObject::require`
66    pub(crate) fn new(object: PyObject, expects_str: bool, has_flush: bool) -> Self {
67        PyFileLikeObject {
68            inner: object,
69            expects_str,
70            has_flush,
71        }
72    }
73
74    pub(crate) fn to_memslice(&self) -> MemSlice {
75        Python::with_gil(|py| {
76            let bytes = self
77                .inner
78                .call_method(py, "read", (), None)
79                .expect("no read method found");
80
81            if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
82                return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
83            }
84
85            if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
86                return match b.to_cow().expect("PyString is not valid UTF-8") {
87                    Cow::Borrowed(v) => {
88                        MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
89                    },
90                    Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
91                };
92            }
93
94            panic!("Expecting to be able to downcast into bytes from read result.");
95        })
96    }
97
98    /// Validates that the underlying
99    /// python object has a `read`, `write`, and `seek` methods in respect to parameters.
100    /// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
101    pub(crate) fn ensure_requirements(
102        object: &Bound<PyAny>,
103        read: bool,
104        write: bool,
105        seek: bool,
106    ) -> PyResult<()> {
107        if read && object.getattr("read").is_err() {
108            return Err(PyErr::new::<PyTypeError, _>(
109                "Object does not have a .read() method.",
110            ));
111        }
112
113        if seek && object.getattr("seek").is_err() {
114            return Err(PyErr::new::<PyTypeError, _>(
115                "Object does not have a .seek() method.",
116            ));
117        }
118
119        if write && object.getattr("write").is_err() {
120            return Err(PyErr::new::<PyTypeError, _>(
121                "Object does not have a .write() method.",
122            ));
123        }
124
125        Ok(())
126    }
127}
128
129/// Extracts a string repr from, and returns an IO error to send back to rust.
130fn pyerr_to_io_err(e: PyErr) -> io::Error {
131    Python::with_gil(|py| {
132        let e_as_object: PyObject = e.into_py_any(py).unwrap();
133
134        match e_as_object.call_method(py, "__str__", (), None) {
135            Ok(repr) => match repr.extract::<String>(py) {
136                Ok(s) => io::Error::other(s),
137                Err(_e) => io::Error::other("An unknown error has occurred"),
138            },
139            Err(_) => io::Error::other("Err doesn't have __str__"),
140        }
141    })
142}
143
144impl Read for PyFileLikeObject {
145    fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
146        Python::with_gil(|py| {
147            let bytes = self
148                .inner
149                .call_method(py, "read", (buf.len(),), None)
150                .map_err(pyerr_to_io_err)?;
151
152            let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
153
154            if let Ok(bytes) = opt_bytes {
155                buf.write_all(bytes.as_bytes())?;
156
157                bytes.len().map_err(pyerr_to_io_err)
158            } else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
159                let s = s.to_cow().map_err(pyerr_to_io_err)?;
160                buf.write_all(s.as_bytes())?;
161                Ok(s.len())
162            } else {
163                Err(io::Error::new(
164                    ErrorKind::InvalidInput,
165                    polars_err!(InvalidOperation: "could not read from input"),
166                ))
167            }
168        })
169    }
170}
171
172impl Write for PyFileLikeObject {
173    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
174        Python::with_gil(|py| {
175            let number_bytes_written = if self.expects_str {
176                self.inner.call_method(
177                    py,
178                    "write",
179                    (PyString::new(
180                        py,
181                        std::str::from_utf8(buf).map_err(io::Error::other)?,
182                    ),),
183                    None,
184                )
185            } else {
186                self.inner
187                    .call_method(py, "write", (PyBytes::new(py, buf),), None)
188            }
189            .map_err(pyerr_to_io_err)?;
190
191            let n = number_bytes_written.extract(py).map_err(pyerr_to_io_err)?;
192
193            Ok(n)
194        })
195    }
196
197    fn flush(&mut self) -> Result<(), io::Error> {
198        if self.has_flush {
199            Python::with_gil(|py| {
200                self.inner
201                    .call_method(py, "flush", (), None)
202                    .map_err(pyerr_to_io_err)
203            })?;
204        }
205
206        Ok(())
207    }
208}
209
210impl Seek for PyFileLikeObject {
211    fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
212        Python::with_gil(|py| {
213            let (whence, offset) = match pos {
214                SeekFrom::Start(i) => (0, i as i64),
215                SeekFrom::Current(i) => (1, i),
216                SeekFrom::End(i) => (2, i),
217            };
218
219            let new_position = self
220                .inner
221                .call_method(py, "seek", (offset, whence), None)
222                .map_err(pyerr_to_io_err)?;
223
224            new_position.extract(py).map_err(pyerr_to_io_err)
225        })
226    }
227}
228
229pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
230
231impl FileLike for File {}
232impl FileLike for ClosableFile {}
233impl FileLike for PyFileLikeObject {}
234impl MmapBytesReader for PyFileLikeObject {}
235
236pub(crate) enum EitherRustPythonFile {
237    Py(PyFileLikeObject),
238    Rust(ClosableFile),
239}
240
241impl EitherRustPythonFile {
242    pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
243        match self {
244            EitherRustPythonFile::Py(f) => Box::new(f),
245            EitherRustPythonFile::Rust(f) => Box::new(f),
246        }
247    }
248
249    fn into_scan_source_input(self) -> PythonScanSourceInput {
250        match self {
251            EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
252            EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
253        }
254    }
255
256    pub(crate) fn into_writeable(self) -> Box<dyn DynWriteable> {
257        match self {
258            Self::Py(f) => Box::new(f),
259            Self::Rust(f) => Box::new(f),
260        }
261    }
262}
263
264pub(crate) enum PythonScanSourceInput {
265    Buffer(MemSlice),
266    Path(PathBuf),
267    File(ClosableFile),
268}
269
270pub(crate) fn try_get_pyfile(
271    py: Python<'_>,
272    py_f: Bound<'_, PyAny>,
273    write: bool,
274) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
275    let io = py.import("io")?;
276    let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
277        let encoding = py_f.getattr("encoding")?;
278        let encoding = encoding.extract::<Cow<str>>()?;
279        Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
280    };
281
282    #[cfg(target_family = "unix")]
283    if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
284        || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
285            || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
286            || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
287            || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
288            || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
289                && is_utf8_encoding(&py_f)?))
290            && if write {
291                // invalidate read buffer
292                py_f.call_method0("flush").is_ok()
293            } else {
294                // flush write buffer
295                py_f.call_method1("seek", (0, 1)).is_ok()
296            })
297    .then(|| {
298        py_f.getattr("fileno")
299            .and_then(|fileno| fileno.call0())
300            .and_then(|fileno| fileno.extract::<libc::c_int>())
301            .ok()
302    })
303    .flatten()
304    .map(|fileno| unsafe {
305        // `File::from_raw_fd()` takes the ownership of the file descriptor.
306        // When the File is dropped, it closes the file descriptor.
307        // This is undesired - the Python file object will become invalid.
308        // Therefore, we duplicate the file descriptor here.
309        // Closing the duplicated file descriptor will not close
310        // the original file descriptor;
311        // and the status, e.g. stream position, is still shared with
312        // the original file descriptor.
313        // We use `F_DUPFD_CLOEXEC` here instead of `dup()`
314        // because it also sets the `O_CLOEXEC` flag on the duplicated file descriptor,
315        // which `dup()` clears.
316        // `open()` in both Rust and Python automatically set `O_CLOEXEC` flag;
317        // it prevents leaking file descriptors across processes,
318        // and we want to be consistent with them.
319        // `F_DUPFD_CLOEXEC` is defined in POSIX.1-2008
320        // and is present on all alive UNIX(-like) systems.
321        libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
322    })
323    .filter(|fileno| *fileno != -1)
324    .map(|fileno| fileno as RawFd)
325    {
326        return Ok((
327            EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
328            // This works on Linux and BSD with procfs mounted,
329            // otherwise it fails silently.
330            fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
331        ));
332    }
333
334    // Unwrap TextIOWrapper
335    // Allow subclasses to allow things like pytest.capture.CaptureIO
336    let py_f = if py_f
337        .is_instance(&io.getattr("TextIOWrapper").unwrap())
338        .unwrap_or_default()
339    {
340        if !is_utf8_encoding(&py_f)? {
341            return Err(PyPolarsErr::from(
342                polars_err!(InvalidOperation: "file encoding is not UTF-8"),
343            )
344            .into());
345        }
346        // XXX: we have to clear buffer here.
347        // Is there a better solution?
348        if write {
349            py_f.call_method0("flush")?;
350        } else {
351            py_f.call_method1("seek", (0, 1))?;
352        }
353        py_f.getattr("buffer")?
354    } else {
355        py_f
356    };
357    PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
358    let expects_str = py_f.is_instance(&io.getattr("TextIOBase").unwrap())?;
359    let has_flush = py_f
360        .getattr_opt("flush")?
361        .is_some_and(|flush| flush.is_callable());
362    let f = PyFileLikeObject::new(py_f.unbind(), expects_str, has_flush);
363    Ok((EitherRustPythonFile::Py(f), None))
364}
365
366pub(crate) fn get_python_scan_source_input(
367    py_f: PyObject,
368    write: bool,
369) -> PyResult<PythonScanSourceInput> {
370    Python::with_gil(|py| {
371        let py_f = py_f.into_bound(py);
372
373        // CPython has some internal tricks that means much of the time
374        // BytesIO.getvalue() involves no memory copying, unlike
375        // BytesIO.read(). So we want to handle BytesIO specially in order
376        // to save memory.
377        let py_f = read_if_bytesio(py_f);
378
379        // If the pyobject is a `bytes` class
380        if let Ok(b) = py_f.downcast::<PyBytes>() {
381            return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
382                b.as_bytes(),
383                // We want to specifically keep alive the PyBytes object.
384                Arc::new(b.clone().unbind()),
385            )));
386        }
387
388        if let Ok(s) = py_f.extract::<Cow<str>>() {
389            let file_path = resolve_homedir(&&*s);
390            Ok(PythonScanSourceInput::Path(file_path))
391        } else {
392            Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
393        }
394    })
395}
396
397fn get_either_buffer_or_path(
398    py_f: PyObject,
399    write: bool,
400) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
401    Python::with_gil(|py| {
402        let py_f = py_f.into_bound(py);
403        if let Ok(s) = py_f.extract::<Cow<str>>() {
404            let file_path = resolve_homedir(&&*s);
405            let f = if write {
406                create_file(&file_path).map_err(PyPolarsErr::from)?
407            } else {
408                polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
409            };
410            Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path)))
411        } else {
412            try_get_pyfile(py, py_f, write)
413        }
414    })
415}
416
417///
418/// # Arguments
419/// * `write` - open for writing; will truncate existing file and create new file if not.
420pub(crate) fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
421    Ok(get_either_buffer_or_path(py_f, write)?.0)
422}
423
424pub(crate) fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
425    Ok(get_either_file(f, truncate)?.into_dyn())
426}
427
428/// If the give file-like is a BytesIO, read its contents in a memory-efficient
429/// way.
430fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
431    let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
432    if py_f.is_instance(&bytes_io).unwrap() {
433        // Note that BytesIO has some memory optimizations ensuring that much of
434        // the time getvalue() doesn't need to copy the underlying data:
435        let Ok(bytes) = py_f.call_method0("getvalue") else {
436            return py_f;
437        };
438        return bytes;
439    }
440    py_f
441}
442
443/// Create reader from PyBytes or a file-like object.
444pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
445    get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
446}
447
448pub(crate) fn get_mmap_bytes_reader_and_path(
449    py_f: &Bound<PyAny>,
450) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
451    let py_f = read_if_bytesio(py_f.clone());
452
453    // bytes object
454    if let Ok(bytes) = py_f.downcast::<PyBytes>() {
455        Ok((
456            Box::new(Cursor::new(MemSlice::from_arc(
457                bytes.as_bytes(),
458                Arc::new(py_f.clone().unbind()),
459            ))),
460            None,
461        ))
462    }
463    // string so read file
464    else {
465        match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
466            (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
467            (EitherRustPythonFile::Py(f), path) => {
468                Ok((Box::new(Cursor::new(f.to_memslice())), path))
469            },
470        }
471    }
472}