Skip to main content

polars_python/
file.rs

1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10
11use polars::io::mmap::MmapBytesReader;
12use polars::prelude::PlRefPath;
13use polars::prelude::file::{Writeable, WriteableTrait};
14use polars_buffer::{Buffer, SharedStorage};
15use polars_error::polars_err;
16use polars_utils::create_file;
17use pyo3::IntoPyObjectExt;
18use pyo3::exceptions::PyTypeError;
19use pyo3::prelude::*;
20use pyo3::types::{PyBytes, PyString, PyStringMethods};
21
22use crate::error::PyPolarsErr;
23use crate::prelude::resolve_homedir;
24use crate::utils::to_py_err;
25
26pub(crate) struct PyFileLikeObject {
27    inner: Py<PyAny>,
28    /// The object expects a string instead of a bytes for `write`.
29    expects_str: bool,
30    /// The object has a flush method.
31    has_flush: bool,
32}
33
34impl WriteableTrait for PyFileLikeObject {
35    fn close(&mut self) -> io::Result<()> {
36        Ok(())
37    }
38
39    fn sync_all(&self) -> std::io::Result<()> {
40        self.flush()
41    }
42
43    fn sync_data(&self) -> std::io::Result<()> {
44        self.flush()
45    }
46}
47
48impl Clone for PyFileLikeObject {
49    fn clone(&self) -> Self {
50        Python::attach(|py| Self {
51            inner: self.inner.clone_ref(py),
52            expects_str: self.expects_str,
53            has_flush: self.has_flush,
54        })
55    }
56}
57
58/// Wraps a `PyObject`, and implements read, seek, and write for it.
59impl PyFileLikeObject {
60    /// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
61    /// To assert the object has the required methods,
62    /// instantiate it with `PyFileLikeObject::require`
63    pub(crate) fn new(object: Py<PyAny>, expects_str: bool, has_flush: bool) -> Self {
64        PyFileLikeObject {
65            inner: object,
66            expects_str,
67            has_flush,
68        }
69    }
70
71    pub(crate) fn to_buffer(&self) -> Buffer<u8> {
72        Python::attach(|py| {
73            let bytes = self
74                .inner
75                .call_method(py, "read", (), None)
76                .expect("no read method found");
77
78            if let Ok(b) = bytes.cast_bound::<PyBytes>(py) {
79                // SAFETY: we keep the underlying python object alive.
80                let slice = b.as_bytes();
81                let owner = bytes.clone_ref(py);
82                let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
83                return Buffer::from_storage(ss);
84            }
85
86            if let Ok(b) = bytes.cast_bound::<PyString>(py) {
87                return match b.to_cow().expect("PyString is not valid UTF-8") {
88                    Cow::Borrowed(v) => {
89                        // SAFETY: we keep the underlying python object alive.
90                        let slice = v.as_bytes();
91                        let owner = bytes.clone_ref(py);
92                        let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
93                        return Buffer::from_storage(ss);
94                    },
95                    Cow::Owned(v) => Buffer::from_vec(v.into_bytes()),
96                };
97            }
98
99            panic!("Expecting to be able to downcast into bytes from read result.");
100        })
101    }
102
103    /// Validates that the underlying
104    /// python object has a `read`, `write`, and `seek` methods in respect to parameters.
105    /// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
106    pub(crate) fn ensure_requirements(
107        object: &Bound<PyAny>,
108        read: bool,
109        write: bool,
110        seek: bool,
111    ) -> PyResult<()> {
112        if read && object.getattr("read").is_err() {
113            return Err(PyErr::new::<PyTypeError, _>(
114                "Object does not have a .read() method.",
115            ));
116        }
117
118        if seek && object.getattr("seek").is_err() {
119            return Err(PyErr::new::<PyTypeError, _>(
120                "Object does not have a .seek() method.",
121            ));
122        }
123
124        if write && object.getattr("write").is_err() {
125            return Err(PyErr::new::<PyTypeError, _>(
126                "Object does not have a .write() method.",
127            ));
128        }
129
130        Ok(())
131    }
132
133    pub fn flush(&self) -> std::io::Result<()> {
134        if self.has_flush {
135            Python::attach(|py| {
136                self.inner
137                    .call_method(py, "flush", (), None)
138                    .map_err(pyerr_to_io_err)
139            })?;
140        }
141
142        Ok(())
143    }
144}
145
146/// Extracts a string repr from, and returns an IO error to send back to rust.
147fn pyerr_to_io_err(e: PyErr) -> io::Error {
148    Python::attach(|py| {
149        let e_as_object: Py<PyAny> = e.into_py_any(py).unwrap();
150
151        match e_as_object.call_method(py, "__str__", (), None) {
152            Ok(repr) => match repr.extract::<String>(py) {
153                Ok(s) => io::Error::other(s),
154                Err(_e) => io::Error::other("An unknown error has occurred"),
155            },
156            Err(_) => io::Error::other("Err doesn't have __str__"),
157        }
158    })
159}
160
161impl Read for PyFileLikeObject {
162    fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
163        Python::attach(|py| {
164            let bytes = self
165                .inner
166                .call_method(py, "read", (buf.len(),), None)
167                .map_err(pyerr_to_io_err)?;
168
169            let opt_bytes = bytes.cast_bound::<PyBytes>(py);
170
171            if let Ok(bytes) = opt_bytes {
172                buf.write_all(bytes.as_bytes())?;
173
174                bytes.len().map_err(pyerr_to_io_err)
175            } else if let Ok(s) = bytes.cast_bound::<PyString>(py) {
176                let s = s.to_cow().map_err(pyerr_to_io_err)?;
177                buf.write_all(s.as_bytes())?;
178                Ok(s.len())
179            } else {
180                Err(io::Error::new(
181                    ErrorKind::InvalidInput,
182                    polars_err!(InvalidOperation: "could not read from input"),
183                ))
184            }
185        })
186    }
187}
188
189impl Write for PyFileLikeObject {
190    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
191        // Note on the .extract() method:
192        // In case of a PyString object, it returns the number of chars,
193        // so we need to take extra steps if the underlying string is not all ASCII.
194        // In case of a ByBytes object, it returns the number of bytes.
195        let expects_str = self.expects_str;
196        let expects_str_and_is_ascii = expects_str && buf.is_ascii();
197
198        Python::attach(|py| {
199            let n_bytes = if expects_str_and_is_ascii {
200                let number_chars_written = unsafe {
201                    self.inner.call_method(
202                        py,
203                        "write",
204                        (PyString::new(py, std::str::from_utf8_unchecked(buf)),),
205                        None,
206                    )
207                }
208                .map_err(pyerr_to_io_err)?;
209                number_chars_written.extract(py).map_err(pyerr_to_io_err)?
210            } else if expects_str {
211                let number_chars_written = self
212                    .inner
213                    .call_method(
214                        py,
215                        "write",
216                        (PyString::new(
217                            py,
218                            std::str::from_utf8(buf).map_err(io::Error::other)?,
219                        ),),
220                        None,
221                    )
222                    .map_err(pyerr_to_io_err)?;
223                let n_chars: usize = number_chars_written.extract(py).map_err(pyerr_to_io_err)?;
224                // calculate n_bytes
225                if n_chars > 0 {
226                    std::str::from_utf8(buf)
227                        .map(|str| {
228                            str.char_indices()
229                                .nth(n_chars - 1)
230                                .map(|(i, ch)| i + ch.len_utf8())
231                                .unwrap()
232                        })
233                        .expect("unable to parse buffer as utf-8")
234                } else {
235                    0
236                }
237            } else {
238                let number_bytes_written = self
239                    .inner
240                    .call_method(py, "write", (PyBytes::new(py, buf),), None)
241                    .map_err(pyerr_to_io_err)?;
242                number_bytes_written.extract(py).map_err(pyerr_to_io_err)?
243            };
244            Ok(n_bytes)
245        })
246    }
247
248    fn flush(&mut self) -> Result<(), io::Error> {
249        Self::flush(self)
250    }
251}
252
253impl Seek for PyFileLikeObject {
254    fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
255        Python::attach(|py| {
256            let (whence, offset) = match pos {
257                SeekFrom::Start(i) => (0, i as i64),
258                SeekFrom::Current(i) => (1, i),
259                SeekFrom::End(i) => (2, i),
260            };
261
262            let new_position = self
263                .inner
264                .call_method(py, "seek", (offset, whence), None)
265                .map_err(pyerr_to_io_err)?;
266
267            new_position.extract(py).map_err(pyerr_to_io_err)
268        })
269    }
270}
271
272pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
273
274impl FileLike for File {}
275impl FileLike for PyFileLikeObject {}
276impl MmapBytesReader for PyFileLikeObject {}
277
278pub(crate) enum EitherRustPythonFile {
279    Py(PyFileLikeObject),
280    Rust(std::fs::File),
281}
282
283impl EitherRustPythonFile {
284    pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
285        match self {
286            EitherRustPythonFile::Py(f) => Box::new(f),
287            EitherRustPythonFile::Rust(f) => Box::new(f),
288        }
289    }
290
291    fn into_scan_source_input(self) -> PythonScanSourceInput {
292        match self {
293            EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_buffer()),
294            EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
295        }
296    }
297
298    pub(crate) fn into_writeable(self) -> Writeable {
299        match self {
300            Self::Py(f) => Writeable::Dyn(Box::new(f)),
301            Self::Rust(f) => Writeable::Local(f),
302        }
303    }
304}
305
306pub(crate) enum PythonScanSourceInput {
307    Buffer(Buffer<u8>),
308    Path(PlRefPath),
309    File(std::fs::File),
310}
311
312pub(crate) fn try_get_pyfile(
313    py: Python<'_>,
314    py_f: Bound<'_, PyAny>,
315    write: bool,
316) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
317    let io = py.import("io")?;
318    let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
319        let encoding = py_f.getattr("encoding")?;
320        let encoding = encoding.extract::<Cow<str>>()?;
321        Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
322    };
323
324    #[cfg(target_family = "unix")]
325    if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
326        || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
327            || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
328            || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
329            || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
330            || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
331                && is_utf8_encoding(&py_f)?))
332            && if write {
333                // invalidate read buffer
334                py_f.call_method0("flush").is_ok()
335            } else {
336                // flush write buffer
337                py_f.call_method1("seek", (0, 1)).is_ok()
338            })
339    .then(|| {
340        py_f.getattr("fileno")
341            .and_then(|fileno| fileno.call0())
342            .and_then(|fileno| fileno.extract::<libc::c_int>())
343            .ok()
344    })
345    .flatten()
346    .map(|fileno| unsafe {
347        // `File::from_raw_fd()` takes the ownership of the file descriptor.
348        // When the File is dropped, it closes the file descriptor.
349        // This is undesired - the Python file object will become invalid.
350        // Therefore, we duplicate the file descriptor here.
351        // Closing the duplicated file descriptor will not close
352        // the original file descriptor;
353        // and the status, e.g. stream position, is still shared with
354        // the original file descriptor.
355        // We use `F_DUPFD_CLOEXEC` here instead of `dup()`
356        // because it also sets the `O_CLOEXEC` flag on the duplicated file descriptor,
357        // which `dup()` clears.
358        // `open()` in both Rust and Python automatically set `O_CLOEXEC` flag;
359        // it prevents leaking file descriptors across processes,
360        // and we want to be consistent with them.
361        // `F_DUPFD_CLOEXEC` is defined in POSIX.1-2008
362        // and is present on all alive UNIX(-like) systems.
363        libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
364    })
365    .filter(|fileno| *fileno != -1)
366    .map(|fileno| fileno as RawFd)
367    {
368        return Ok((
369            EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
370            // This works on Linux and BSD with procfs mounted,
371            // otherwise it fails silently.
372            fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
373        ));
374    }
375
376    // Unwrap TextIOWrapper
377    // Allow subclasses to allow things like pytest.capture.CaptureIO
378    let py_f = if py_f
379        .is_instance(&io.getattr("TextIOWrapper").unwrap())
380        .unwrap_or_default()
381    {
382        if !is_utf8_encoding(&py_f)? {
383            return Err(PyPolarsErr::from(
384                polars_err!(InvalidOperation: "file encoding is not UTF-8"),
385            )
386            .into());
387        }
388        // XXX: we have to clear buffer here.
389        // Is there a better solution?
390        if write {
391            py_f.call_method0("flush")?;
392        } else {
393            py_f.call_method1("seek", (0, 1))?;
394        }
395        py_f.getattr("buffer")?
396    } else {
397        py_f
398    };
399    PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
400    let expects_str = py_f.is_instance(&io.getattr("TextIOBase").unwrap())?;
401    let has_flush = py_f
402        .getattr_opt("flush")?
403        .is_some_and(|flush| flush.is_callable());
404    let f = PyFileLikeObject::new(py_f.unbind(), expects_str, has_flush);
405    Ok((EitherRustPythonFile::Py(f), None))
406}
407
408pub(crate) fn get_python_scan_source_input(
409    py_f: Py<PyAny>,
410    write: bool,
411) -> PyResult<PythonScanSourceInput> {
412    Python::attach(|py| {
413        let py_f = py_f.into_bound(py);
414
415        // CPython has some internal tricks that means much of the time
416        // BytesIO.getvalue() involves no memory copying, unlike
417        // BytesIO.read(). So we want to handle BytesIO specially in order
418        // to save memory.
419        let py_f = read_if_bytesio(py_f);
420
421        // If the pyobject is a `bytes` class
422        if let Ok(b) = py_f.cast::<PyBytes>() {
423            // SAFETY: we keep the underlying python object alive.
424            let slice = b.as_bytes();
425            let owner = b.clone().unbind();
426            let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
427            let buffer = Buffer::from_storage(ss);
428            return Ok(PythonScanSourceInput::Buffer(buffer));
429        }
430
431        if let Ok(s) = py_f.extract::<Cow<str>>() {
432            let file_path = PlRefPath::try_from_path(resolve_homedir(s.as_ref()).as_ref())
433                .map_err(to_py_err)?;
434
435            Ok(PythonScanSourceInput::Path(file_path))
436        } else {
437            Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
438        }
439    })
440}
441
442fn get_either_buffer_or_path(
443    py_f: Py<PyAny>,
444    write: bool,
445) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
446    Python::attach(|py| {
447        let py_f = py_f.into_bound(py);
448        if let Ok(s) = py_f.extract::<Cow<str>>() {
449            let file_path = resolve_homedir(s.as_ref());
450            let f = if write {
451                create_file(&file_path).map_err(PyPolarsErr::from)?
452            } else {
453                polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
454            };
455            Ok((
456                EitherRustPythonFile::Rust(f.into()),
457                Some(file_path.into_owned()),
458            ))
459        } else {
460            try_get_pyfile(py, py_f, write)
461        }
462    })
463}
464
465///
466/// # Arguments
467/// * `write` - open for writing; will truncate existing file and create new file if not.
468pub(crate) fn get_either_file(py_f: Py<PyAny>, write: bool) -> PyResult<EitherRustPythonFile> {
469    Ok(get_either_buffer_or_path(py_f, write)?.0)
470}
471
472pub(crate) fn get_file_like(f: Py<PyAny>, truncate: bool) -> PyResult<Box<dyn FileLike>> {
473    Ok(get_either_file(f, truncate)?.into_dyn())
474}
475
476/// If the give file-like is a BytesIO, read its contents in a memory-efficient
477/// way.
478fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
479    let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
480    if py_f.is_instance(&bytes_io).unwrap() {
481        // Note that BytesIO has some memory optimizations ensuring that much of
482        // the time getvalue() doesn't need to copy the underlying data:
483        let Ok(bytes) = py_f.call_method0("getvalue") else {
484            return py_f;
485        };
486        return bytes;
487    }
488    py_f
489}
490
491/// Create reader from PyBytes or a file-like object.
492pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
493    get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
494}
495
496pub(crate) fn get_mmap_bytes_reader_and_path(
497    py_f: &Bound<PyAny>,
498) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
499    let py_f = read_if_bytesio(py_f.clone());
500
501    // bytes object
502    if let Ok(bytes) = py_f.cast::<PyBytes>() {
503        // SAFETY: we keep the underlying python object alive.
504        let slice = bytes.as_bytes();
505        let owner = bytes.clone().unbind();
506        let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
507        Ok((Box::new(Cursor::new(Buffer::from_storage(ss))), None))
508    }
509    // string so read file
510    else {
511        match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
512            (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
513            (EitherRustPythonFile::Py(f), path) => Ok((Box::new(Cursor::new(f.to_buffer())), path)),
514        }
515    }
516}