polars_python/
file.rs

1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use polars::io::mmap::MmapBytesReader;
13use polars::prelude::file::DynWriteable;
14use polars::prelude::sync_on_close::SyncOnCloseType;
15use polars_error::{PolarsResult, polars_err};
16use polars_io::cloud::CloudOptions;
17use polars_utils::create_file;
18use polars_utils::file::{ClosableFile, WriteClose};
19use polars_utils::mmap::MemSlice;
20use pyo3::IntoPyObjectExt;
21use pyo3::exceptions::PyTypeError;
22use pyo3::prelude::*;
23use pyo3::types::{PyBytes, PyString, PyStringMethods};
24
25use crate::error::PyPolarsErr;
26use crate::prelude::resolve_homedir;
27
28pub(crate) struct PyFileLikeObject {
29    inner: PyObject,
30}
31
32impl WriteClose for PyFileLikeObject {}
33impl DynWriteable for PyFileLikeObject {
34    fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
35        self as _
36    }
37    fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
38        self as _
39    }
40    fn close(self: Box<Self>) -> io::Result<()> {
41        Ok(())
42    }
43    fn sync_on_close(&mut self, _sync_on_close: SyncOnCloseType) -> io::Result<()> {
44        Ok(())
45    }
46}
47
48impl Clone for PyFileLikeObject {
49    fn clone(&self) -> Self {
50        Python::with_gil(|py| Self {
51            inner: self.inner.clone_ref(py),
52        })
53    }
54}
55
56/// Wraps a `PyObject`, and implements read, seek, and write for it.
57impl PyFileLikeObject {
58    /// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
59    /// To assert the object has the required methods,
60    /// instantiate it with `PyFileLikeObject::require`
61    pub(crate) fn new(object: PyObject) -> Self {
62        PyFileLikeObject { inner: object }
63    }
64
65    pub(crate) fn to_memslice(&self) -> MemSlice {
66        Python::with_gil(|py| {
67            let bytes = self
68                .inner
69                .call_method(py, "read", (), None)
70                .expect("no read method found");
71
72            if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
73                return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
74            }
75
76            if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
77                return match b.to_cow().expect("PyString is not valid UTF-8") {
78                    Cow::Borrowed(v) => {
79                        MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
80                    },
81                    Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
82                };
83            }
84
85            panic!("Expecting to be able to downcast into bytes from read result.");
86        })
87    }
88
89    /// Validates that the underlying
90    /// python object has a `read`, `write`, and `seek` methods in respect to parameters.
91    /// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
92    pub(crate) fn ensure_requirements(
93        object: &Bound<PyAny>,
94        read: bool,
95        write: bool,
96        seek: bool,
97    ) -> PyResult<()> {
98        if read && object.getattr("read").is_err() {
99            return Err(PyErr::new::<PyTypeError, _>(
100                "Object does not have a .read() method.",
101            ));
102        }
103
104        if seek && object.getattr("seek").is_err() {
105            return Err(PyErr::new::<PyTypeError, _>(
106                "Object does not have a .seek() method.",
107            ));
108        }
109
110        if write && object.getattr("write").is_err() {
111            return Err(PyErr::new::<PyTypeError, _>(
112                "Object does not have a .write() method.",
113            ));
114        }
115
116        Ok(())
117    }
118}
119
120/// Extracts a string repr from, and returns an IO error to send back to rust.
121fn pyerr_to_io_err(e: PyErr) -> io::Error {
122    Python::with_gil(|py| {
123        let e_as_object: PyObject = e.into_py_any(py).unwrap();
124
125        match e_as_object.call_method(py, "__str__", (), None) {
126            Ok(repr) => match repr.extract::<String>(py) {
127                Ok(s) => io::Error::other(s),
128                Err(_e) => io::Error::other("An unknown error has occurred"),
129            },
130            Err(_) => io::Error::other("Err doesn't have __str__"),
131        }
132    })
133}
134
135impl Read for PyFileLikeObject {
136    fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
137        Python::with_gil(|py| {
138            let bytes = self
139                .inner
140                .call_method(py, "read", (buf.len(),), None)
141                .map_err(pyerr_to_io_err)?;
142
143            let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
144
145            if let Ok(bytes) = opt_bytes {
146                buf.write_all(bytes.as_bytes())?;
147
148                bytes.len().map_err(pyerr_to_io_err)
149            } else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
150                let s = s.to_cow().map_err(pyerr_to_io_err)?;
151                buf.write_all(s.as_bytes())?;
152                Ok(s.len())
153            } else {
154                Err(io::Error::new(
155                    ErrorKind::InvalidInput,
156                    polars_err!(InvalidOperation: "could not read from input"),
157                ))
158            }
159        })
160    }
161}
162
163impl Write for PyFileLikeObject {
164    fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
165        Python::with_gil(|py| {
166            let pybytes = PyBytes::new(py, buf);
167
168            let number_bytes_written = self
169                .inner
170                .call_method(py, "write", (pybytes,), None)
171                .map_err(pyerr_to_io_err)?;
172
173            number_bytes_written.extract(py).map_err(pyerr_to_io_err)
174        })
175    }
176
177    fn flush(&mut self) -> Result<(), io::Error> {
178        Python::with_gil(|py| {
179            self.inner
180                .call_method(py, "flush", (), None)
181                .map_err(pyerr_to_io_err)?;
182
183            Ok(())
184        })
185    }
186}
187
188impl Seek for PyFileLikeObject {
189    fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
190        Python::with_gil(|py| {
191            let (whence, offset) = match pos {
192                SeekFrom::Start(i) => (0, i as i64),
193                SeekFrom::Current(i) => (1, i),
194                SeekFrom::End(i) => (2, i),
195            };
196
197            let new_position = self
198                .inner
199                .call_method(py, "seek", (offset, whence), None)
200                .map_err(pyerr_to_io_err)?;
201
202            new_position.extract(py).map_err(pyerr_to_io_err)
203        })
204    }
205}
206
207pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
208
209impl FileLike for File {}
210impl FileLike for ClosableFile {}
211impl FileLike for PyFileLikeObject {}
212impl MmapBytesReader for PyFileLikeObject {}
213
214pub(crate) enum EitherRustPythonFile {
215    Py(PyFileLikeObject),
216    Rust(ClosableFile),
217}
218
219impl EitherRustPythonFile {
220    pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
221        match self {
222            EitherRustPythonFile::Py(f) => Box::new(f),
223            EitherRustPythonFile::Rust(f) => Box::new(f),
224        }
225    }
226
227    fn into_scan_source_input(self) -> PythonScanSourceInput {
228        match self {
229            EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
230            EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
231        }
232    }
233
234    pub(crate) fn into_writeable(self) -> Box<dyn DynWriteable> {
235        match self {
236            Self::Py(f) => Box::new(f),
237            Self::Rust(f) => Box::new(f),
238        }
239    }
240
241    pub(crate) fn into_dyn_writeable(self) -> Box<dyn WriteClose + Send> {
242        match self {
243            EitherRustPythonFile::Py(f) => Box::new(f),
244            EitherRustPythonFile::Rust(f) => Box::new(f),
245        }
246    }
247}
248
249pub(crate) enum PythonScanSourceInput {
250    Buffer(MemSlice),
251    Path(PathBuf),
252    File(ClosableFile),
253}
254
255pub(crate) fn try_get_pyfile(
256    py: Python,
257    py_f: Bound<'_, PyAny>,
258    write: bool,
259) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
260    let io = py.import("io")?;
261    let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
262        let encoding = py_f.getattr("encoding")?;
263        let encoding = encoding.extract::<Cow<str>>()?;
264        Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
265    };
266
267    #[cfg(target_family = "unix")]
268    if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
269        || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
270            || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
271            || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
272            || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
273            || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
274                && is_utf8_encoding(&py_f)?))
275            && if write {
276                // invalidate read buffer
277                py_f.call_method0("flush").is_ok()
278            } else {
279                // flush write buffer
280                py_f.call_method1("seek", (0, 1)).is_ok()
281            })
282    .then(|| {
283        py_f.getattr("fileno")
284            .and_then(|fileno| fileno.call0())
285            .and_then(|fileno| fileno.extract::<libc::c_int>())
286            .ok()
287    })
288    .flatten()
289    .map(|fileno| unsafe {
290        // `File::from_raw_fd()` takes the ownership of the file descriptor.
291        // When the File is dropped, it closes the file descriptor.
292        // This is undesired - the Python file object will become invalid.
293        // Therefore, we duplicate the file descriptor here.
294        // Closing the duplicated file descriptor will not close
295        // the original file descriptor;
296        // and the status, e.g. stream position, is still shared with
297        // the original file descriptor.
298        // We use `F_DUPFD_CLOEXEC` here instead of `dup()`
299        // because it also sets the `O_CLOEXEC` flag on the duplicated file descriptor,
300        // which `dup()` clears.
301        // `open()` in both Rust and Python automatically set `O_CLOEXEC` flag;
302        // it prevents leaking file descriptors across processes,
303        // and we want to be consistent with them.
304        // `F_DUPFD_CLOEXEC` is defined in POSIX.1-2008
305        // and is present on all alive UNIX(-like) systems.
306        libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
307    })
308    .filter(|fileno| *fileno != -1)
309    .map(|fileno| fileno as RawFd)
310    {
311        return Ok((
312            EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
313            // This works on Linux and BSD with procfs mounted,
314            // otherwise it fails silently.
315            fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
316        ));
317    }
318
319    // Unwrap TextIOWrapper
320    // Allow subclasses to allow things like pytest.capture.CaptureIO
321    let py_f = if py_f
322        .is_instance(&io.getattr("TextIOWrapper").unwrap())
323        .unwrap_or_default()
324    {
325        if !is_utf8_encoding(&py_f)? {
326            return Err(PyPolarsErr::from(
327                polars_err!(InvalidOperation: "file encoding is not UTF-8"),
328            )
329            .into());
330        }
331        // XXX: we have to clear buffer here.
332        // Is there a better solution?
333        if write {
334            py_f.call_method0("flush")?;
335        } else {
336            py_f.call_method1("seek", (0, 1))?;
337        }
338        py_f.getattr("buffer")?
339    } else {
340        py_f
341    };
342    PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
343    let f = PyFileLikeObject::new(py_f.unbind());
344    Ok((EitherRustPythonFile::Py(f), None))
345}
346
347pub(crate) fn get_python_scan_source_input(
348    py_f: PyObject,
349    write: bool,
350) -> PyResult<PythonScanSourceInput> {
351    Python::with_gil(|py| {
352        let py_f = py_f.into_bound(py);
353
354        // CPython has some internal tricks that means much of the time
355        // BytesIO.getvalue() involves no memory copying, unlike
356        // BytesIO.read(). So we want to handle BytesIO specially in order
357        // to save memory.
358        let py_f = read_if_bytesio(py_f);
359
360        // If the pyobject is a `bytes` class
361        if let Ok(b) = py_f.downcast::<PyBytes>() {
362            return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
363                b.as_bytes(),
364                // We want to specifically keep alive the PyBytes object.
365                Arc::new(b.clone().unbind()),
366            )));
367        }
368
369        if let Ok(s) = py_f.extract::<Cow<str>>() {
370            let file_path = resolve_homedir(&&*s);
371            Ok(PythonScanSourceInput::Path(file_path))
372        } else {
373            Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
374        }
375    })
376}
377
378fn get_either_buffer_or_path(
379    py_f: PyObject,
380    write: bool,
381) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
382    Python::with_gil(|py| {
383        let py_f = py_f.into_bound(py);
384        if let Ok(s) = py_f.extract::<Cow<str>>() {
385            let file_path = resolve_homedir(&&*s);
386            let f = if write {
387                create_file(&file_path).map_err(PyPolarsErr::from)?
388            } else {
389                polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
390            };
391            Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path)))
392        } else {
393            try_get_pyfile(py, py_f, write)
394        }
395    })
396}
397
398///
399/// # Arguments
400/// * `write` - open for writing; will truncate existing file and create new file if not.
401pub(crate) fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
402    Ok(get_either_buffer_or_path(py_f, write)?.0)
403}
404
405pub(crate) fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
406    Ok(get_either_file(f, truncate)?.into_dyn())
407}
408
409/// If the give file-like is a BytesIO, read its contents in a memory-efficient
410/// way.
411fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
412    let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
413    if py_f.is_instance(&bytes_io).unwrap() {
414        // Note that BytesIO has some memory optimizations ensuring that much of
415        // the time getvalue() doesn't need to copy the underlying data:
416        let Ok(bytes) = py_f.call_method0("getvalue") else {
417            return py_f;
418        };
419        return bytes;
420    }
421    py_f
422}
423
424/// Create reader from PyBytes or a file-like object.
425pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
426    get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
427}
428
429pub(crate) fn get_mmap_bytes_reader_and_path(
430    py_f: &Bound<PyAny>,
431) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
432    let py_f = read_if_bytesio(py_f.clone());
433
434    // bytes object
435    if let Ok(bytes) = py_f.downcast::<PyBytes>() {
436        Ok((
437            Box::new(Cursor::new(MemSlice::from_arc(
438                bytes.as_bytes(),
439                Arc::new(py_f.clone().unbind()),
440            ))),
441            None,
442        ))
443    }
444    // string so read file
445    else {
446        match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
447            (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
448            (EitherRustPythonFile::Py(f), path) => {
449                Ok((Box::new(Cursor::new(f.to_memslice())), path))
450            },
451        }
452    }
453}
454
455pub(crate) fn try_get_writeable(
456    py_f: PyObject,
457    cloud_options: Option<&CloudOptions>,
458) -> PyResult<Box<dyn WriteClose + Send>> {
459    Python::with_gil(|py| {
460        let py_f = py_f.into_bound(py);
461
462        if let Ok(s) = py_f.extract::<Cow<str>>() {
463            polars::prelude::file::try_get_writeable(&s, cloud_options)
464                .map_err(PyPolarsErr::from)
465                .map_err(|e| e.into())
466        } else {
467            Ok(try_get_pyfile(py, py_f, true)?.0.into_dyn_writeable())
468        }
469    })
470}
471
472pub(crate) fn close_file(f: Box<dyn WriteClose>) -> PolarsResult<()> {
473    f.close().map_err(|e| {
474        let err: polars_error::PolarsError = e.into();
475        err
476    })
477}