polars_python/
file.rs

1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use polars::io::mmap::MmapBytesReader;
13use polars::prelude::PlPath;
14use polars::prelude::file::DynWriteable;
15use polars::prelude::sync_on_close::SyncOnCloseType;
16use polars_error::polars_err;
17use polars_utils::create_file;
18use polars_utils::file::{ClosableFile, WriteClose};
19use polars_utils::mmap::MemSlice;
20use pyo3::IntoPyObjectExt;
21use pyo3::exceptions::PyTypeError;
22use pyo3::prelude::*;
23use pyo3::types::{PyBytes, PyString, PyStringMethods};
24
25use crate::error::PyPolarsErr;
26use crate::prelude::resolve_homedir;
27
28pub(crate) struct PyFileLikeObject {
29    inner: PyObject,
30    /// The object expects a string instead of a bytes for `write`.
31    expects_str: bool,
32    /// The object has a flush method.
33    has_flush: bool,
34}
35
36impl WriteClose for PyFileLikeObject {}
37impl DynWriteable for PyFileLikeObject {
38    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
39        self as _
40    }
41    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
42        self as _
43    }
44    fn close(self: Box<Self>) -> io::Result<()> {
45        Ok(())
46    }
47    fn sync_on_close(&mut self, _sync_on_close: SyncOnCloseType) -> io::Result<()> {
48        Ok(())
49    }
50}
51
52impl Clone for PyFileLikeObject {
53    fn clone(&self) -> Self {
54        Python::with_gil(|py| Self {
55            inner: self.inner.clone_ref(py),
56            expects_str: self.expects_str,
57            has_flush: self.has_flush,
58        })
59    }
60}
61
62/// Wraps a `PyObject`, and implements read, seek, and write for it.
63impl PyFileLikeObject {
64    /// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
65    /// To assert the object has the required methods,
66    /// instantiate it with `PyFileLikeObject::require`
67    pub(crate) fn new(object: PyObject, expects_str: bool, has_flush: bool) -> Self {
68        PyFileLikeObject {
69            inner: object,
70            expects_str,
71            has_flush,
72        }
73    }
74
75    pub(crate) fn to_memslice(&self) -> MemSlice {
76        Python::with_gil(|py| {
77            let bytes = self
78                .inner
79                .call_method(py, "read", (), None)
80                .expect("no read method found");
81
82            if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
83                return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
84            }
85
86            if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
87                return match b.to_cow().expect("PyString is not valid UTF-8") {
88                    Cow::Borrowed(v) => {
89                        MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
90                    },
91                    Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
92                };
93            }
94
95            panic!("Expecting to be able to downcast into bytes from read result.");
96        })
97    }
98
99    /// Validates that the underlying
100    /// python object has a `read`, `write`, and `seek` methods in respect to parameters.
101    /// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
102    pub(crate) fn ensure_requirements(
103        object: &Bound<PyAny>,
104        read: bool,
105        write: bool,
106        seek: bool,
107    ) -> PyResult<()> {
108        if read && object.getattr("read").is_err() {
109            return Err(PyErr::new::<PyTypeError, _>(
110                "Object does not have a .read() method.",
111            ));
112        }
113
114        if seek && object.getattr("seek").is_err() {
115            return Err(PyErr::new::<PyTypeError, _>(
116                "Object does not have a .seek() method.",
117            ));
118        }
119
120        if write && object.getattr("write").is_err() {
121            return Err(PyErr::new::<PyTypeError, _>(
122                "Object does not have a .write() method.",
123            ));
124        }
125
126        Ok(())
127    }
128}
129
130/// Extracts a string repr from, and returns an IO error to send back to rust.
131fn pyerr_to_io_err(e: PyErr) -> io::Error {
132    Python::with_gil(|py| {
133        let e_as_object: PyObject = e.into_py_any(py).unwrap();
134
135        match e_as_object.call_method(py, "__str__", (), None) {
136            Ok(repr) => match repr.extract::<String>(py) {
137                Ok(s) => io::Error::other(s),
138                Err(_e) => io::Error::other("An unknown error has occurred"),
139            },
140            Err(_) => io::Error::other("Err doesn't have __str__"),
141        }
142    })
143}
144
145impl Read for PyFileLikeObject {
146    fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
147        Python::with_gil(|py| {
148            let bytes = self
149                .inner
150                .call_method(py, "read", (buf.len(),), None)
151                .map_err(pyerr_to_io_err)?;
152
153            let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
154
155            if let Ok(bytes) = opt_bytes {
156                buf.write_all(bytes.as_bytes())?;
157
158                bytes.len().map_err(pyerr_to_io_err)
159            } else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
160                let s = s.to_cow().map_err(pyerr_to_io_err)?;
161                buf.write_all(s.as_bytes())?;
162                Ok(s.len())
163            } else {
164                Err(io::Error::new(
165                    ErrorKind::InvalidInput,
166                    polars_err!(InvalidOperation: "could not read from input"),
167                ))
168            }
169        })
170    }
171}
172
173impl Write for PyFileLikeObject {
174    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
175        // Note on the .extract() method:
176        // In case of a PyString object, it returns the number of chars,
177        // so we need to take extra steps if the underlying string is not all ASCII.
178        // In case of a ByBytes object, it returns the number of bytes.
179        let expects_str = self.expects_str;
180        let expects_str_and_is_ascii = expects_str && buf.is_ascii();
181
182        Python::with_gil(|py| {
183            let n_bytes = if expects_str_and_is_ascii {
184                let number_chars_written = unsafe {
185                    self.inner.call_method(
186                        py,
187                        "write",
188                        (PyString::new(py, std::str::from_utf8_unchecked(buf)),),
189                        None,
190                    )
191                }
192                .map_err(pyerr_to_io_err)?;
193                number_chars_written.extract(py).map_err(pyerr_to_io_err)?
194            } else if expects_str {
195                let number_chars_written = self
196                    .inner
197                    .call_method(
198                        py,
199                        "write",
200                        (PyString::new(
201                            py,
202                            std::str::from_utf8(buf).map_err(io::Error::other)?,
203                        ),),
204                        None,
205                    )
206                    .map_err(pyerr_to_io_err)?;
207                let n_chars: usize = number_chars_written.extract(py).map_err(pyerr_to_io_err)?;
208                // calculate n_bytes
209                if n_chars > 0 {
210                    std::str::from_utf8(buf)
211                        .map(|str| {
212                            str.char_indices()
213                                .nth(n_chars - 1)
214                                .map(|(i, ch)| i + ch.len_utf8())
215                                .unwrap()
216                        })
217                        .expect("unable to parse buffer as utf-8")
218                } else {
219                    0
220                }
221            } else {
222                let number_bytes_written = self
223                    .inner
224                    .call_method(py, "write", (PyBytes::new(py, buf),), None)
225                    .map_err(pyerr_to_io_err)?;
226                number_bytes_written.extract(py).map_err(pyerr_to_io_err)?
227            };
228            Ok(n_bytes)
229        })
230    }
231
232    fn flush(&mut self) -> Result<(), io::Error> {
233        if self.has_flush {
234            Python::with_gil(|py| {
235                self.inner
236                    .call_method(py, "flush", (), None)
237                    .map_err(pyerr_to_io_err)
238            })?;
239        }
240
241        Ok(())
242    }
243}
244
245impl Seek for PyFileLikeObject {
246    fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
247        Python::with_gil(|py| {
248            let (whence, offset) = match pos {
249                SeekFrom::Start(i) => (0, i as i64),
250                SeekFrom::Current(i) => (1, i),
251                SeekFrom::End(i) => (2, i),
252            };
253
254            let new_position = self
255                .inner
256                .call_method(py, "seek", (offset, whence), None)
257                .map_err(pyerr_to_io_err)?;
258
259            new_position.extract(py).map_err(pyerr_to_io_err)
260        })
261    }
262}
263
264pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
265
266impl FileLike for File {}
267impl FileLike for ClosableFile {}
268impl FileLike for PyFileLikeObject {}
269impl MmapBytesReader for PyFileLikeObject {}
270
271pub(crate) enum EitherRustPythonFile {
272    Py(PyFileLikeObject),
273    Rust(ClosableFile),
274}
275
276impl EitherRustPythonFile {
277    pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
278        match self {
279            EitherRustPythonFile::Py(f) => Box::new(f),
280            EitherRustPythonFile::Rust(f) => Box::new(f),
281        }
282    }
283
284    fn into_scan_source_input(self) -> PythonScanSourceInput {
285        match self {
286            EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
287            EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
288        }
289    }
290
291    pub(crate) fn into_writeable(self) -> Box<dyn DynWriteable> {
292        match self {
293            Self::Py(f) => Box::new(f),
294            Self::Rust(f) => Box::new(f),
295        }
296    }
297}
298
299pub(crate) enum PythonScanSourceInput {
300    Buffer(MemSlice),
301    Path(PlPath),
302    File(ClosableFile),
303}
304
305pub(crate) fn try_get_pyfile(
306    py: Python<'_>,
307    py_f: Bound<'_, PyAny>,
308    write: bool,
309) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
310    let io = py.import("io")?;
311    let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
312        let encoding = py_f.getattr("encoding")?;
313        let encoding = encoding.extract::<Cow<str>>()?;
314        Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
315    };
316
317    #[cfg(target_family = "unix")]
318    if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
319        || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
320            || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
321            || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
322            || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
323            || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
324                && is_utf8_encoding(&py_f)?))
325            && if write {
326                // invalidate read buffer
327                py_f.call_method0("flush").is_ok()
328            } else {
329                // flush write buffer
330                py_f.call_method1("seek", (0, 1)).is_ok()
331            })
332    .then(|| {
333        py_f.getattr("fileno")
334            .and_then(|fileno| fileno.call0())
335            .and_then(|fileno| fileno.extract::<libc::c_int>())
336            .ok()
337    })
338    .flatten()
339    .map(|fileno| unsafe {
340        // `File::from_raw_fd()` takes the ownership of the file descriptor.
341        // When the File is dropped, it closes the file descriptor.
342        // This is undesired - the Python file object will become invalid.
343        // Therefore, we duplicate the file descriptor here.
344        // Closing the duplicated file descriptor will not close
345        // the original file descriptor;
346        // and the status, e.g. stream position, is still shared with
347        // the original file descriptor.
348        // We use `F_DUPFD_CLOEXEC` here instead of `dup()`
349        // because it also sets the `O_CLOEXEC` flag on the duplicated file descriptor,
350        // which `dup()` clears.
351        // `open()` in both Rust and Python automatically set `O_CLOEXEC` flag;
352        // it prevents leaking file descriptors across processes,
353        // and we want to be consistent with them.
354        // `F_DUPFD_CLOEXEC` is defined in POSIX.1-2008
355        // and is present on all alive UNIX(-like) systems.
356        libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
357    })
358    .filter(|fileno| *fileno != -1)
359    .map(|fileno| fileno as RawFd)
360    {
361        return Ok((
362            EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
363            // This works on Linux and BSD with procfs mounted,
364            // otherwise it fails silently.
365            fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
366        ));
367    }
368
369    // Unwrap TextIOWrapper
370    // Allow subclasses to allow things like pytest.capture.CaptureIO
371    let py_f = if py_f
372        .is_instance(&io.getattr("TextIOWrapper").unwrap())
373        .unwrap_or_default()
374    {
375        if !is_utf8_encoding(&py_f)? {
376            return Err(PyPolarsErr::from(
377                polars_err!(InvalidOperation: "file encoding is not UTF-8"),
378            )
379            .into());
380        }
381        // XXX: we have to clear buffer here.
382        // Is there a better solution?
383        if write {
384            py_f.call_method0("flush")?;
385        } else {
386            py_f.call_method1("seek", (0, 1))?;
387        }
388        py_f.getattr("buffer")?
389    } else {
390        py_f
391    };
392    PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
393    let expects_str = py_f.is_instance(&io.getattr("TextIOBase").unwrap())?;
394    let has_flush = py_f
395        .getattr_opt("flush")?
396        .is_some_and(|flush| flush.is_callable());
397    let f = PyFileLikeObject::new(py_f.unbind(), expects_str, has_flush);
398    Ok((EitherRustPythonFile::Py(f), None))
399}
400
401pub(crate) fn get_python_scan_source_input(
402    py_f: PyObject,
403    write: bool,
404) -> PyResult<PythonScanSourceInput> {
405    Python::with_gil(|py| {
406        let py_f = py_f.into_bound(py);
407
408        // CPython has some internal tricks that means much of the time
409        // BytesIO.getvalue() involves no memory copying, unlike
410        // BytesIO.read(). So we want to handle BytesIO specially in order
411        // to save memory.
412        let py_f = read_if_bytesio(py_f);
413
414        // If the pyobject is a `bytes` class
415        if let Ok(b) = py_f.downcast::<PyBytes>() {
416            return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
417                b.as_bytes(),
418                // We want to specifically keep alive the PyBytes object.
419                Arc::new(b.clone().unbind()),
420            )));
421        }
422
423        if let Ok(s) = py_f.extract::<Cow<str>>() {
424            let mut file_path = PlPath::new(&s);
425            if let Some(p) = file_path.as_ref().as_local_path() {
426                if p.starts_with("~/") {
427                    file_path = PlPath::Local(resolve_homedir(&p).into());
428                }
429            }
430            Ok(PythonScanSourceInput::Path(file_path))
431        } else {
432            Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
433        }
434    })
435}
436
437fn get_either_buffer_or_path(
438    py_f: PyObject,
439    write: bool,
440) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
441    Python::with_gil(|py| {
442        let py_f = py_f.into_bound(py);
443        if let Ok(s) = py_f.extract::<Cow<str>>() {
444            let file_path = resolve_homedir(&&*s);
445            let f = if write {
446                create_file(&file_path).map_err(PyPolarsErr::from)?
447            } else {
448                polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
449            };
450            Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path)))
451        } else {
452            try_get_pyfile(py, py_f, write)
453        }
454    })
455}
456
457///
458/// # Arguments
459/// * `write` - open for writing; will truncate existing file and create new file if not.
460pub(crate) fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
461    Ok(get_either_buffer_or_path(py_f, write)?.0)
462}
463
464pub(crate) fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
465    Ok(get_either_file(f, truncate)?.into_dyn())
466}
467
468/// If the give file-like is a BytesIO, read its contents in a memory-efficient
469/// way.
470fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
471    let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
472    if py_f.is_instance(&bytes_io).unwrap() {
473        // Note that BytesIO has some memory optimizations ensuring that much of
474        // the time getvalue() doesn't need to copy the underlying data:
475        let Ok(bytes) = py_f.call_method0("getvalue") else {
476            return py_f;
477        };
478        return bytes;
479    }
480    py_f
481}
482
483/// Create reader from PyBytes or a file-like object.
484pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
485    get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
486}
487
488pub(crate) fn get_mmap_bytes_reader_and_path(
489    py_f: &Bound<PyAny>,
490) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
491    let py_f = read_if_bytesio(py_f.clone());
492
493    // bytes object
494    if let Ok(bytes) = py_f.downcast::<PyBytes>() {
495        Ok((
496            Box::new(Cursor::new(MemSlice::from_arc(
497                bytes.as_bytes(),
498                Arc::new(py_f.clone().unbind()),
499            ))),
500            None,
501        ))
502    }
503    // string so read file
504    else {
505        match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
506            (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
507            (EitherRustPythonFile::Py(f), path) => {
508                Ok((Box::new(Cursor::new(f.to_memslice())), path))
509            },
510        }
511    }
512}