1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use polars::io::mmap::MmapBytesReader;
13use polars::prelude::PlPath;
14use polars::prelude::file::DynWriteable;
15use polars::prelude::sync_on_close::SyncOnCloseType;
16use polars_error::polars_err;
17use polars_utils::create_file;
18use polars_utils::file::{ClosableFile, WriteClose};
19use polars_utils::mmap::MemSlice;
20use pyo3::IntoPyObjectExt;
21use pyo3::exceptions::PyTypeError;
22use pyo3::prelude::*;
23use pyo3::types::{PyBytes, PyString, PyStringMethods};
24
25use crate::error::PyPolarsErr;
26use crate::prelude::resolve_homedir;
27
28pub(crate) struct PyFileLikeObject {
29 inner: PyObject,
30 expects_str: bool,
32 has_flush: bool,
34}
35
36impl WriteClose for PyFileLikeObject {}
37impl DynWriteable for PyFileLikeObject {
38 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
39 self as _
40 }
41 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
42 self as _
43 }
44 fn close(self: Box<Self>) -> io::Result<()> {
45 Ok(())
46 }
47 fn sync_on_close(&mut self, _sync_on_close: SyncOnCloseType) -> io::Result<()> {
48 Ok(())
49 }
50}
51
52impl Clone for PyFileLikeObject {
53 fn clone(&self) -> Self {
54 Python::with_gil(|py| Self {
55 inner: self.inner.clone_ref(py),
56 expects_str: self.expects_str,
57 has_flush: self.has_flush,
58 })
59 }
60}
61
62impl PyFileLikeObject {
64 pub(crate) fn new(object: PyObject, expects_str: bool, has_flush: bool) -> Self {
68 PyFileLikeObject {
69 inner: object,
70 expects_str,
71 has_flush,
72 }
73 }
74
75 pub(crate) fn to_memslice(&self) -> MemSlice {
76 Python::with_gil(|py| {
77 let bytes = self
78 .inner
79 .call_method(py, "read", (), None)
80 .expect("no read method found");
81
82 if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
83 return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
84 }
85
86 if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
87 return match b.to_cow().expect("PyString is not valid UTF-8") {
88 Cow::Borrowed(v) => {
89 MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
90 },
91 Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
92 };
93 }
94
95 panic!("Expecting to be able to downcast into bytes from read result.");
96 })
97 }
98
99 pub(crate) fn ensure_requirements(
103 object: &Bound<PyAny>,
104 read: bool,
105 write: bool,
106 seek: bool,
107 ) -> PyResult<()> {
108 if read && object.getattr("read").is_err() {
109 return Err(PyErr::new::<PyTypeError, _>(
110 "Object does not have a .read() method.",
111 ));
112 }
113
114 if seek && object.getattr("seek").is_err() {
115 return Err(PyErr::new::<PyTypeError, _>(
116 "Object does not have a .seek() method.",
117 ));
118 }
119
120 if write && object.getattr("write").is_err() {
121 return Err(PyErr::new::<PyTypeError, _>(
122 "Object does not have a .write() method.",
123 ));
124 }
125
126 Ok(())
127 }
128}
129
130fn pyerr_to_io_err(e: PyErr) -> io::Error {
132 Python::with_gil(|py| {
133 let e_as_object: PyObject = e.into_py_any(py).unwrap();
134
135 match e_as_object.call_method(py, "__str__", (), None) {
136 Ok(repr) => match repr.extract::<String>(py) {
137 Ok(s) => io::Error::other(s),
138 Err(_e) => io::Error::other("An unknown error has occurred"),
139 },
140 Err(_) => io::Error::other("Err doesn't have __str__"),
141 }
142 })
143}
144
145impl Read for PyFileLikeObject {
146 fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
147 Python::with_gil(|py| {
148 let bytes = self
149 .inner
150 .call_method(py, "read", (buf.len(),), None)
151 .map_err(pyerr_to_io_err)?;
152
153 let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
154
155 if let Ok(bytes) = opt_bytes {
156 buf.write_all(bytes.as_bytes())?;
157
158 bytes.len().map_err(pyerr_to_io_err)
159 } else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
160 let s = s.to_cow().map_err(pyerr_to_io_err)?;
161 buf.write_all(s.as_bytes())?;
162 Ok(s.len())
163 } else {
164 Err(io::Error::new(
165 ErrorKind::InvalidInput,
166 polars_err!(InvalidOperation: "could not read from input"),
167 ))
168 }
169 })
170 }
171}
172
173impl Write for PyFileLikeObject {
174 fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
175 let expects_str = self.expects_str;
180 let expects_str_and_is_ascii = expects_str && buf.is_ascii();
181
182 Python::with_gil(|py| {
183 let n_bytes = if expects_str_and_is_ascii {
184 let number_chars_written = unsafe {
185 self.inner.call_method(
186 py,
187 "write",
188 (PyString::new(py, std::str::from_utf8_unchecked(buf)),),
189 None,
190 )
191 }
192 .map_err(pyerr_to_io_err)?;
193 number_chars_written.extract(py).map_err(pyerr_to_io_err)?
194 } else if expects_str {
195 let number_chars_written = self
196 .inner
197 .call_method(
198 py,
199 "write",
200 (PyString::new(
201 py,
202 std::str::from_utf8(buf).map_err(io::Error::other)?,
203 ),),
204 None,
205 )
206 .map_err(pyerr_to_io_err)?;
207 let n_chars: usize = number_chars_written.extract(py).map_err(pyerr_to_io_err)?;
208 if n_chars > 0 {
210 std::str::from_utf8(buf)
211 .map(|str| {
212 str.char_indices()
213 .nth(n_chars - 1)
214 .map(|(i, ch)| i + ch.len_utf8())
215 .unwrap()
216 })
217 .expect("unable to parse buffer as utf-8")
218 } else {
219 0
220 }
221 } else {
222 let number_bytes_written = self
223 .inner
224 .call_method(py, "write", (PyBytes::new(py, buf),), None)
225 .map_err(pyerr_to_io_err)?;
226 number_bytes_written.extract(py).map_err(pyerr_to_io_err)?
227 };
228 Ok(n_bytes)
229 })
230 }
231
232 fn flush(&mut self) -> Result<(), io::Error> {
233 if self.has_flush {
234 Python::with_gil(|py| {
235 self.inner
236 .call_method(py, "flush", (), None)
237 .map_err(pyerr_to_io_err)
238 })?;
239 }
240
241 Ok(())
242 }
243}
244
245impl Seek for PyFileLikeObject {
246 fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
247 Python::with_gil(|py| {
248 let (whence, offset) = match pos {
249 SeekFrom::Start(i) => (0, i as i64),
250 SeekFrom::Current(i) => (1, i),
251 SeekFrom::End(i) => (2, i),
252 };
253
254 let new_position = self
255 .inner
256 .call_method(py, "seek", (offset, whence), None)
257 .map_err(pyerr_to_io_err)?;
258
259 new_position.extract(py).map_err(pyerr_to_io_err)
260 })
261 }
262}
263
264pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
265
266impl FileLike for File {}
267impl FileLike for ClosableFile {}
268impl FileLike for PyFileLikeObject {}
269impl MmapBytesReader for PyFileLikeObject {}
270
271pub(crate) enum EitherRustPythonFile {
272 Py(PyFileLikeObject),
273 Rust(ClosableFile),
274}
275
276impl EitherRustPythonFile {
277 pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
278 match self {
279 EitherRustPythonFile::Py(f) => Box::new(f),
280 EitherRustPythonFile::Rust(f) => Box::new(f),
281 }
282 }
283
284 fn into_scan_source_input(self) -> PythonScanSourceInput {
285 match self {
286 EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
287 EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
288 }
289 }
290
291 pub(crate) fn into_writeable(self) -> Box<dyn DynWriteable> {
292 match self {
293 Self::Py(f) => Box::new(f),
294 Self::Rust(f) => Box::new(f),
295 }
296 }
297}
298
299pub(crate) enum PythonScanSourceInput {
300 Buffer(MemSlice),
301 Path(PlPath),
302 File(ClosableFile),
303}
304
305pub(crate) fn try_get_pyfile(
306 py: Python<'_>,
307 py_f: Bound<'_, PyAny>,
308 write: bool,
309) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
310 let io = py.import("io")?;
311 let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
312 let encoding = py_f.getattr("encoding")?;
313 let encoding = encoding.extract::<Cow<str>>()?;
314 Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
315 };
316
317 #[cfg(target_family = "unix")]
318 if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
319 || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
320 || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
321 || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
322 || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
323 || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
324 && is_utf8_encoding(&py_f)?))
325 && if write {
326 py_f.call_method0("flush").is_ok()
328 } else {
329 py_f.call_method1("seek", (0, 1)).is_ok()
331 })
332 .then(|| {
333 py_f.getattr("fileno")
334 .and_then(|fileno| fileno.call0())
335 .and_then(|fileno| fileno.extract::<libc::c_int>())
336 .ok()
337 })
338 .flatten()
339 .map(|fileno| unsafe {
340 libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
357 })
358 .filter(|fileno| *fileno != -1)
359 .map(|fileno| fileno as RawFd)
360 {
361 return Ok((
362 EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
363 fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
366 ));
367 }
368
369 let py_f = if py_f
372 .is_instance(&io.getattr("TextIOWrapper").unwrap())
373 .unwrap_or_default()
374 {
375 if !is_utf8_encoding(&py_f)? {
376 return Err(PyPolarsErr::from(
377 polars_err!(InvalidOperation: "file encoding is not UTF-8"),
378 )
379 .into());
380 }
381 if write {
384 py_f.call_method0("flush")?;
385 } else {
386 py_f.call_method1("seek", (0, 1))?;
387 }
388 py_f.getattr("buffer")?
389 } else {
390 py_f
391 };
392 PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
393 let expects_str = py_f.is_instance(&io.getattr("TextIOBase").unwrap())?;
394 let has_flush = py_f
395 .getattr_opt("flush")?
396 .is_some_and(|flush| flush.is_callable());
397 let f = PyFileLikeObject::new(py_f.unbind(), expects_str, has_flush);
398 Ok((EitherRustPythonFile::Py(f), None))
399}
400
401pub(crate) fn get_python_scan_source_input(
402 py_f: PyObject,
403 write: bool,
404) -> PyResult<PythonScanSourceInput> {
405 Python::with_gil(|py| {
406 let py_f = py_f.into_bound(py);
407
408 let py_f = read_if_bytesio(py_f);
413
414 if let Ok(b) = py_f.downcast::<PyBytes>() {
416 return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
417 b.as_bytes(),
418 Arc::new(b.clone().unbind()),
420 )));
421 }
422
423 if let Ok(s) = py_f.extract::<Cow<str>>() {
424 let mut file_path = PlPath::new(&s);
425 if let Some(p) = file_path.as_ref().as_local_path() {
426 if p.starts_with("~/") {
427 file_path = PlPath::Local(resolve_homedir(&p).into());
428 }
429 }
430 Ok(PythonScanSourceInput::Path(file_path))
431 } else {
432 Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
433 }
434 })
435}
436
437fn get_either_buffer_or_path(
438 py_f: PyObject,
439 write: bool,
440) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
441 Python::with_gil(|py| {
442 let py_f = py_f.into_bound(py);
443 if let Ok(s) = py_f.extract::<Cow<str>>() {
444 let file_path = resolve_homedir(&&*s);
445 let f = if write {
446 create_file(&file_path).map_err(PyPolarsErr::from)?
447 } else {
448 polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
449 };
450 Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path)))
451 } else {
452 try_get_pyfile(py, py_f, write)
453 }
454 })
455}
456
457pub(crate) fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
461 Ok(get_either_buffer_or_path(py_f, write)?.0)
462}
463
464pub(crate) fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
465 Ok(get_either_file(f, truncate)?.into_dyn())
466}
467
468fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
471 let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
472 if py_f.is_instance(&bytes_io).unwrap() {
473 let Ok(bytes) = py_f.call_method0("getvalue") else {
476 return py_f;
477 };
478 return bytes;
479 }
480 py_f
481}
482
483pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
485 get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
486}
487
488pub(crate) fn get_mmap_bytes_reader_and_path(
489 py_f: &Bound<PyAny>,
490) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
491 let py_f = read_if_bytesio(py_f.clone());
492
493 if let Ok(bytes) = py_f.downcast::<PyBytes>() {
495 Ok((
496 Box::new(Cursor::new(MemSlice::from_arc(
497 bytes.as_bytes(),
498 Arc::new(py_f.clone().unbind()),
499 ))),
500 None,
501 ))
502 }
503 else {
505 match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
506 (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
507 (EitherRustPythonFile::Py(f), path) => {
508 Ok((Box::new(Cursor::new(f.to_memslice())), path))
509 },
510 }
511 }
512}