1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use polars::io::mmap::MmapBytesReader;
13use polars::prelude::file::DynWriteable;
14use polars::prelude::sync_on_close::SyncOnCloseType;
15use polars_error::{PolarsResult, polars_err};
16use polars_io::cloud::CloudOptions;
17use polars_utils::create_file;
18use polars_utils::file::{ClosableFile, WriteClose};
19use polars_utils::mmap::MemSlice;
20use pyo3::IntoPyObjectExt;
21use pyo3::exceptions::PyTypeError;
22use pyo3::prelude::*;
23use pyo3::types::{PyBytes, PyString, PyStringMethods};
24
25use crate::error::PyPolarsErr;
26use crate::prelude::resolve_homedir;
27
28pub(crate) struct PyFileLikeObject {
29 inner: PyObject,
30}
31
32impl WriteClose for PyFileLikeObject {}
33impl DynWriteable for PyFileLikeObject {
34 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
35 self as _
36 }
37 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
38 self as _
39 }
40 fn close(self: Box<Self>) -> io::Result<()> {
41 Ok(())
42 }
43 fn sync_on_close(&mut self, _sync_on_close: SyncOnCloseType) -> io::Result<()> {
44 Ok(())
45 }
46}
47
48impl Clone for PyFileLikeObject {
49 fn clone(&self) -> Self {
50 Python::with_gil(|py| Self {
51 inner: self.inner.clone_ref(py),
52 })
53 }
54}
55
56impl PyFileLikeObject {
58 pub(crate) fn new(object: PyObject) -> Self {
62 PyFileLikeObject { inner: object }
63 }
64
65 pub(crate) fn to_memslice(&self) -> MemSlice {
66 Python::with_gil(|py| {
67 let bytes = self
68 .inner
69 .call_method(py, "read", (), None)
70 .expect("no read method found");
71
72 if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
73 return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
74 }
75
76 if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
77 return match b.to_cow().expect("PyString is not valid UTF-8") {
78 Cow::Borrowed(v) => {
79 MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
80 },
81 Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
82 };
83 }
84
85 panic!("Expecting to be able to downcast into bytes from read result.");
86 })
87 }
88
89 pub(crate) fn ensure_requirements(
93 object: &Bound<PyAny>,
94 read: bool,
95 write: bool,
96 seek: bool,
97 ) -> PyResult<()> {
98 if read && object.getattr("read").is_err() {
99 return Err(PyErr::new::<PyTypeError, _>(
100 "Object does not have a .read() method.",
101 ));
102 }
103
104 if seek && object.getattr("seek").is_err() {
105 return Err(PyErr::new::<PyTypeError, _>(
106 "Object does not have a .seek() method.",
107 ));
108 }
109
110 if write && object.getattr("write").is_err() {
111 return Err(PyErr::new::<PyTypeError, _>(
112 "Object does not have a .write() method.",
113 ));
114 }
115
116 Ok(())
117 }
118}
119
120fn pyerr_to_io_err(e: PyErr) -> io::Error {
122 Python::with_gil(|py| {
123 let e_as_object: PyObject = e.into_py_any(py).unwrap();
124
125 match e_as_object.call_method(py, "__str__", (), None) {
126 Ok(repr) => match repr.extract::<String>(py) {
127 Ok(s) => io::Error::other(s),
128 Err(_e) => io::Error::other("An unknown error has occurred"),
129 },
130 Err(_) => io::Error::other("Err doesn't have __str__"),
131 }
132 })
133}
134
135impl Read for PyFileLikeObject {
136 fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
137 Python::with_gil(|py| {
138 let bytes = self
139 .inner
140 .call_method(py, "read", (buf.len(),), None)
141 .map_err(pyerr_to_io_err)?;
142
143 let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
144
145 if let Ok(bytes) = opt_bytes {
146 buf.write_all(bytes.as_bytes())?;
147
148 bytes.len().map_err(pyerr_to_io_err)
149 } else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
150 let s = s.to_cow().map_err(pyerr_to_io_err)?;
151 buf.write_all(s.as_bytes())?;
152 Ok(s.len())
153 } else {
154 Err(io::Error::new(
155 ErrorKind::InvalidInput,
156 polars_err!(InvalidOperation: "could not read from input"),
157 ))
158 }
159 })
160 }
161}
162
163impl Write for PyFileLikeObject {
164 fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
165 Python::with_gil(|py| {
166 let pybytes = PyBytes::new(py, buf);
167
168 let number_bytes_written = self
169 .inner
170 .call_method(py, "write", (pybytes,), None)
171 .map_err(pyerr_to_io_err)?;
172
173 number_bytes_written.extract(py).map_err(pyerr_to_io_err)
174 })
175 }
176
177 fn flush(&mut self) -> Result<(), io::Error> {
178 Python::with_gil(|py| {
179 self.inner
180 .call_method(py, "flush", (), None)
181 .map_err(pyerr_to_io_err)?;
182
183 Ok(())
184 })
185 }
186}
187
188impl Seek for PyFileLikeObject {
189 fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
190 Python::with_gil(|py| {
191 let (whence, offset) = match pos {
192 SeekFrom::Start(i) => (0, i as i64),
193 SeekFrom::Current(i) => (1, i),
194 SeekFrom::End(i) => (2, i),
195 };
196
197 let new_position = self
198 .inner
199 .call_method(py, "seek", (offset, whence), None)
200 .map_err(pyerr_to_io_err)?;
201
202 new_position.extract(py).map_err(pyerr_to_io_err)
203 })
204 }
205}
206
207pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
208
209impl FileLike for File {}
210impl FileLike for ClosableFile {}
211impl FileLike for PyFileLikeObject {}
212impl MmapBytesReader for PyFileLikeObject {}
213
214pub(crate) enum EitherRustPythonFile {
215 Py(PyFileLikeObject),
216 Rust(ClosableFile),
217}
218
219impl EitherRustPythonFile {
220 pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
221 match self {
222 EitherRustPythonFile::Py(f) => Box::new(f),
223 EitherRustPythonFile::Rust(f) => Box::new(f),
224 }
225 }
226
227 fn into_scan_source_input(self) -> PythonScanSourceInput {
228 match self {
229 EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
230 EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
231 }
232 }
233
234 pub(crate) fn into_writeable(self) -> Box<dyn DynWriteable> {
235 match self {
236 Self::Py(f) => Box::new(f),
237 Self::Rust(f) => Box::new(f),
238 }
239 }
240
241 pub(crate) fn into_dyn_writeable(self) -> Box<dyn WriteClose + Send> {
242 match self {
243 EitherRustPythonFile::Py(f) => Box::new(f),
244 EitherRustPythonFile::Rust(f) => Box::new(f),
245 }
246 }
247}
248
249pub(crate) enum PythonScanSourceInput {
250 Buffer(MemSlice),
251 Path(PathBuf),
252 File(ClosableFile),
253}
254
255pub(crate) fn try_get_pyfile(
256 py: Python,
257 py_f: Bound<'_, PyAny>,
258 write: bool,
259) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
260 let io = py.import("io")?;
261 let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
262 let encoding = py_f.getattr("encoding")?;
263 let encoding = encoding.extract::<Cow<str>>()?;
264 Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
265 };
266
267 #[cfg(target_family = "unix")]
268 if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
269 || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
270 || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
271 || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
272 || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
273 || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
274 && is_utf8_encoding(&py_f)?))
275 && if write {
276 py_f.call_method0("flush").is_ok()
278 } else {
279 py_f.call_method1("seek", (0, 1)).is_ok()
281 })
282 .then(|| {
283 py_f.getattr("fileno")
284 .and_then(|fileno| fileno.call0())
285 .and_then(|fileno| fileno.extract::<libc::c_int>())
286 .ok()
287 })
288 .flatten()
289 .map(|fileno| unsafe {
290 libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
307 })
308 .filter(|fileno| *fileno != -1)
309 .map(|fileno| fileno as RawFd)
310 {
311 return Ok((
312 EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
313 fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
316 ));
317 }
318
319 let py_f = if py_f
322 .is_instance(&io.getattr("TextIOWrapper").unwrap())
323 .unwrap_or_default()
324 {
325 if !is_utf8_encoding(&py_f)? {
326 return Err(PyPolarsErr::from(
327 polars_err!(InvalidOperation: "file encoding is not UTF-8"),
328 )
329 .into());
330 }
331 if write {
334 py_f.call_method0("flush")?;
335 } else {
336 py_f.call_method1("seek", (0, 1))?;
337 }
338 py_f.getattr("buffer")?
339 } else {
340 py_f
341 };
342 PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
343 let f = PyFileLikeObject::new(py_f.unbind());
344 Ok((EitherRustPythonFile::Py(f), None))
345}
346
347pub(crate) fn get_python_scan_source_input(
348 py_f: PyObject,
349 write: bool,
350) -> PyResult<PythonScanSourceInput> {
351 Python::with_gil(|py| {
352 let py_f = py_f.into_bound(py);
353
354 let py_f = read_if_bytesio(py_f);
359
360 if let Ok(b) = py_f.downcast::<PyBytes>() {
362 return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
363 b.as_bytes(),
364 Arc::new(b.clone().unbind()),
366 )));
367 }
368
369 if let Ok(s) = py_f.extract::<Cow<str>>() {
370 let file_path = resolve_homedir(&&*s);
371 Ok(PythonScanSourceInput::Path(file_path))
372 } else {
373 Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
374 }
375 })
376}
377
378fn get_either_buffer_or_path(
379 py_f: PyObject,
380 write: bool,
381) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
382 Python::with_gil(|py| {
383 let py_f = py_f.into_bound(py);
384 if let Ok(s) = py_f.extract::<Cow<str>>() {
385 let file_path = resolve_homedir(&&*s);
386 let f = if write {
387 create_file(&file_path).map_err(PyPolarsErr::from)?
388 } else {
389 polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
390 };
391 Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path)))
392 } else {
393 try_get_pyfile(py, py_f, write)
394 }
395 })
396}
397
398pub(crate) fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
402 Ok(get_either_buffer_or_path(py_f, write)?.0)
403}
404
405pub(crate) fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
406 Ok(get_either_file(f, truncate)?.into_dyn())
407}
408
409fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
412 let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
413 if py_f.is_instance(&bytes_io).unwrap() {
414 let Ok(bytes) = py_f.call_method0("getvalue") else {
417 return py_f;
418 };
419 return bytes;
420 }
421 py_f
422}
423
424pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
426 get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
427}
428
429pub(crate) fn get_mmap_bytes_reader_and_path(
430 py_f: &Bound<PyAny>,
431) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
432 let py_f = read_if_bytesio(py_f.clone());
433
434 if let Ok(bytes) = py_f.downcast::<PyBytes>() {
436 Ok((
437 Box::new(Cursor::new(MemSlice::from_arc(
438 bytes.as_bytes(),
439 Arc::new(py_f.clone().unbind()),
440 ))),
441 None,
442 ))
443 }
444 else {
446 match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
447 (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
448 (EitherRustPythonFile::Py(f), path) => {
449 Ok((Box::new(Cursor::new(f.to_memslice())), path))
450 },
451 }
452 }
453}
454
455pub(crate) fn try_get_writeable(
456 py_f: PyObject,
457 cloud_options: Option<&CloudOptions>,
458) -> PyResult<Box<dyn WriteClose + Send>> {
459 Python::with_gil(|py| {
460 let py_f = py_f.into_bound(py);
461
462 if let Ok(s) = py_f.extract::<Cow<str>>() {
463 polars::prelude::file::try_get_writeable(&s, cloud_options)
464 .map_err(PyPolarsErr::from)
465 .map_err(|e| e.into())
466 } else {
467 Ok(try_get_pyfile(py, py_f, true)?.0.into_dyn_writeable())
468 }
469 })
470}
471
472pub(crate) fn close_file(f: Box<dyn WriteClose>) -> PolarsResult<()> {
473 f.close().map_err(|e| {
474 let err: polars_error::PolarsError = e.into();
475 err
476 })
477}