1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10use std::sync::Arc;
11
12use polars::io::mmap::MmapBytesReader;
13use polars::prelude::file::DynWriteable;
14use polars::prelude::sync_on_close::SyncOnCloseType;
15use polars_error::polars_err;
16use polars_utils::create_file;
17use polars_utils::file::{ClosableFile, WriteClose};
18use polars_utils::mmap::MemSlice;
19use pyo3::IntoPyObjectExt;
20use pyo3::exceptions::PyTypeError;
21use pyo3::prelude::*;
22use pyo3::types::{PyBytes, PyString, PyStringMethods};
23
24use crate::error::PyPolarsErr;
25use crate::prelude::resolve_homedir;
26
27pub(crate) struct PyFileLikeObject {
28 inner: PyObject,
29 expects_str: bool,
31 has_flush: bool,
33}
34
35impl WriteClose for PyFileLikeObject {}
36impl DynWriteable for PyFileLikeObject {
37 fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
38 self as _
39 }
40 fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
41 self as _
42 }
43 fn close(self: Box<Self>) -> io::Result<()> {
44 Ok(())
45 }
46 fn sync_on_close(&mut self, _sync_on_close: SyncOnCloseType) -> io::Result<()> {
47 Ok(())
48 }
49}
50
51impl Clone for PyFileLikeObject {
52 fn clone(&self) -> Self {
53 Python::with_gil(|py| Self {
54 inner: self.inner.clone_ref(py),
55 expects_str: self.expects_str,
56 has_flush: self.has_flush,
57 })
58 }
59}
60
61impl PyFileLikeObject {
63 pub(crate) fn new(object: PyObject, expects_str: bool, has_flush: bool) -> Self {
67 PyFileLikeObject {
68 inner: object,
69 expects_str,
70 has_flush,
71 }
72 }
73
74 pub(crate) fn to_memslice(&self) -> MemSlice {
75 Python::with_gil(|py| {
76 let bytes = self
77 .inner
78 .call_method(py, "read", (), None)
79 .expect("no read method found");
80
81 if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
82 return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
83 }
84
85 if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
86 return match b.to_cow().expect("PyString is not valid UTF-8") {
87 Cow::Borrowed(v) => {
88 MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
89 },
90 Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
91 };
92 }
93
94 panic!("Expecting to be able to downcast into bytes from read result.");
95 })
96 }
97
98 pub(crate) fn ensure_requirements(
102 object: &Bound<PyAny>,
103 read: bool,
104 write: bool,
105 seek: bool,
106 ) -> PyResult<()> {
107 if read && object.getattr("read").is_err() {
108 return Err(PyErr::new::<PyTypeError, _>(
109 "Object does not have a .read() method.",
110 ));
111 }
112
113 if seek && object.getattr("seek").is_err() {
114 return Err(PyErr::new::<PyTypeError, _>(
115 "Object does not have a .seek() method.",
116 ));
117 }
118
119 if write && object.getattr("write").is_err() {
120 return Err(PyErr::new::<PyTypeError, _>(
121 "Object does not have a .write() method.",
122 ));
123 }
124
125 Ok(())
126 }
127}
128
129fn pyerr_to_io_err(e: PyErr) -> io::Error {
131 Python::with_gil(|py| {
132 let e_as_object: PyObject = e.into_py_any(py).unwrap();
133
134 match e_as_object.call_method(py, "__str__", (), None) {
135 Ok(repr) => match repr.extract::<String>(py) {
136 Ok(s) => io::Error::other(s),
137 Err(_e) => io::Error::other("An unknown error has occurred"),
138 },
139 Err(_) => io::Error::other("Err doesn't have __str__"),
140 }
141 })
142}
143
144impl Read for PyFileLikeObject {
145 fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
146 Python::with_gil(|py| {
147 let bytes = self
148 .inner
149 .call_method(py, "read", (buf.len(),), None)
150 .map_err(pyerr_to_io_err)?;
151
152 let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
153
154 if let Ok(bytes) = opt_bytes {
155 buf.write_all(bytes.as_bytes())?;
156
157 bytes.len().map_err(pyerr_to_io_err)
158 } else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
159 let s = s.to_cow().map_err(pyerr_to_io_err)?;
160 buf.write_all(s.as_bytes())?;
161 Ok(s.len())
162 } else {
163 Err(io::Error::new(
164 ErrorKind::InvalidInput,
165 polars_err!(InvalidOperation: "could not read from input"),
166 ))
167 }
168 })
169 }
170}
171
172impl Write for PyFileLikeObject {
173 fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
174 Python::with_gil(|py| {
175 let number_bytes_written = if self.expects_str {
176 self.inner.call_method(
177 py,
178 "write",
179 (PyString::new(
180 py,
181 std::str::from_utf8(buf).map_err(io::Error::other)?,
182 ),),
183 None,
184 )
185 } else {
186 self.inner
187 .call_method(py, "write", (PyBytes::new(py, buf),), None)
188 }
189 .map_err(pyerr_to_io_err)?;
190
191 let n = number_bytes_written.extract(py).map_err(pyerr_to_io_err)?;
192
193 Ok(n)
194 })
195 }
196
197 fn flush(&mut self) -> Result<(), io::Error> {
198 if self.has_flush {
199 Python::with_gil(|py| {
200 self.inner
201 .call_method(py, "flush", (), None)
202 .map_err(pyerr_to_io_err)
203 })?;
204 }
205
206 Ok(())
207 }
208}
209
210impl Seek for PyFileLikeObject {
211 fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
212 Python::with_gil(|py| {
213 let (whence, offset) = match pos {
214 SeekFrom::Start(i) => (0, i as i64),
215 SeekFrom::Current(i) => (1, i),
216 SeekFrom::End(i) => (2, i),
217 };
218
219 let new_position = self
220 .inner
221 .call_method(py, "seek", (offset, whence), None)
222 .map_err(pyerr_to_io_err)?;
223
224 new_position.extract(py).map_err(pyerr_to_io_err)
225 })
226 }
227}
228
229pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
230
231impl FileLike for File {}
232impl FileLike for ClosableFile {}
233impl FileLike for PyFileLikeObject {}
234impl MmapBytesReader for PyFileLikeObject {}
235
236pub(crate) enum EitherRustPythonFile {
237 Py(PyFileLikeObject),
238 Rust(ClosableFile),
239}
240
241impl EitherRustPythonFile {
242 pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
243 match self {
244 EitherRustPythonFile::Py(f) => Box::new(f),
245 EitherRustPythonFile::Rust(f) => Box::new(f),
246 }
247 }
248
249 fn into_scan_source_input(self) -> PythonScanSourceInput {
250 match self {
251 EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
252 EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
253 }
254 }
255
256 pub(crate) fn into_writeable(self) -> Box<dyn DynWriteable> {
257 match self {
258 Self::Py(f) => Box::new(f),
259 Self::Rust(f) => Box::new(f),
260 }
261 }
262}
263
264pub(crate) enum PythonScanSourceInput {
265 Buffer(MemSlice),
266 Path(PathBuf),
267 File(ClosableFile),
268}
269
270pub(crate) fn try_get_pyfile(
271 py: Python<'_>,
272 py_f: Bound<'_, PyAny>,
273 write: bool,
274) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
275 let io = py.import("io")?;
276 let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
277 let encoding = py_f.getattr("encoding")?;
278 let encoding = encoding.extract::<Cow<str>>()?;
279 Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
280 };
281
282 #[cfg(target_family = "unix")]
283 if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
284 || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
285 || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
286 || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
287 || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
288 || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
289 && is_utf8_encoding(&py_f)?))
290 && if write {
291 py_f.call_method0("flush").is_ok()
293 } else {
294 py_f.call_method1("seek", (0, 1)).is_ok()
296 })
297 .then(|| {
298 py_f.getattr("fileno")
299 .and_then(|fileno| fileno.call0())
300 .and_then(|fileno| fileno.extract::<libc::c_int>())
301 .ok()
302 })
303 .flatten()
304 .map(|fileno| unsafe {
305 libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
322 })
323 .filter(|fileno| *fileno != -1)
324 .map(|fileno| fileno as RawFd)
325 {
326 return Ok((
327 EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
328 fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
331 ));
332 }
333
334 let py_f = if py_f
337 .is_instance(&io.getattr("TextIOWrapper").unwrap())
338 .unwrap_or_default()
339 {
340 if !is_utf8_encoding(&py_f)? {
341 return Err(PyPolarsErr::from(
342 polars_err!(InvalidOperation: "file encoding is not UTF-8"),
343 )
344 .into());
345 }
346 if write {
349 py_f.call_method0("flush")?;
350 } else {
351 py_f.call_method1("seek", (0, 1))?;
352 }
353 py_f.getattr("buffer")?
354 } else {
355 py_f
356 };
357 PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
358 let expects_str = py_f.is_instance(&io.getattr("TextIOBase").unwrap())?;
359 let has_flush = py_f
360 .getattr_opt("flush")?
361 .is_some_and(|flush| flush.is_callable());
362 let f = PyFileLikeObject::new(py_f.unbind(), expects_str, has_flush);
363 Ok((EitherRustPythonFile::Py(f), None))
364}
365
366pub(crate) fn get_python_scan_source_input(
367 py_f: PyObject,
368 write: bool,
369) -> PyResult<PythonScanSourceInput> {
370 Python::with_gil(|py| {
371 let py_f = py_f.into_bound(py);
372
373 let py_f = read_if_bytesio(py_f);
378
379 if let Ok(b) = py_f.downcast::<PyBytes>() {
381 return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
382 b.as_bytes(),
383 Arc::new(b.clone().unbind()),
385 )));
386 }
387
388 if let Ok(s) = py_f.extract::<Cow<str>>() {
389 let file_path = resolve_homedir(&&*s);
390 Ok(PythonScanSourceInput::Path(file_path))
391 } else {
392 Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
393 }
394 })
395}
396
397fn get_either_buffer_or_path(
398 py_f: PyObject,
399 write: bool,
400) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
401 Python::with_gil(|py| {
402 let py_f = py_f.into_bound(py);
403 if let Ok(s) = py_f.extract::<Cow<str>>() {
404 let file_path = resolve_homedir(&&*s);
405 let f = if write {
406 create_file(&file_path).map_err(PyPolarsErr::from)?
407 } else {
408 polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
409 };
410 Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path)))
411 } else {
412 try_get_pyfile(py, py_f, write)
413 }
414 })
415}
416
417pub(crate) fn get_either_file(py_f: PyObject, write: bool) -> PyResult<EitherRustPythonFile> {
421 Ok(get_either_buffer_or_path(py_f, write)?.0)
422}
423
424pub(crate) fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
425 Ok(get_either_file(f, truncate)?.into_dyn())
426}
427
428fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
431 let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
432 if py_f.is_instance(&bytes_io).unwrap() {
433 let Ok(bytes) = py_f.call_method0("getvalue") else {
436 return py_f;
437 };
438 return bytes;
439 }
440 py_f
441}
442
443pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
445 get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
446}
447
448pub(crate) fn get_mmap_bytes_reader_and_path(
449 py_f: &Bound<PyAny>,
450) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
451 let py_f = read_if_bytesio(py_f.clone());
452
453 if let Ok(bytes) = py_f.downcast::<PyBytes>() {
455 Ok((
456 Box::new(Cursor::new(MemSlice::from_arc(
457 bytes.as_bytes(),
458 Arc::new(py_f.clone().unbind()),
459 ))),
460 None,
461 ))
462 }
463 else {
465 match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
466 (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
467 (EitherRustPythonFile::Py(f), path) => {
468 Ok((Box::new(Cursor::new(f.to_memslice())), path))
469 },
470 }
471 }
472}