1use std::borrow::Cow;
2#[cfg(target_family = "unix")]
3use std::fs;
4use std::fs::File;
5use std::io;
6use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7#[cfg(target_family = "unix")]
8use std::os::fd::{FromRawFd, RawFd};
9use std::path::PathBuf;
10
11use polars::io::mmap::MmapBytesReader;
12use polars::prelude::PlRefPath;
13use polars::prelude::file::{Writeable, WriteableTrait};
14use polars_buffer::{Buffer, SharedStorage};
15use polars_error::polars_err;
16use polars_utils::create_file;
17use pyo3::IntoPyObjectExt;
18use pyo3::exceptions::PyTypeError;
19use pyo3::prelude::*;
20use pyo3::types::{PyBytes, PyString, PyStringMethods};
21
22use crate::error::PyPolarsErr;
23use crate::prelude::resolve_homedir;
24use crate::utils::to_py_err;
25
26pub(crate) struct PyFileLikeObject {
27 inner: Py<PyAny>,
28 expects_str: bool,
30 has_flush: bool,
32}
33
34impl WriteableTrait for PyFileLikeObject {
35 fn close(&mut self) -> io::Result<()> {
36 Ok(())
37 }
38
39 fn sync_all(&self) -> std::io::Result<()> {
40 self.flush()
41 }
42
43 fn sync_data(&self) -> std::io::Result<()> {
44 self.flush()
45 }
46}
47
48impl Clone for PyFileLikeObject {
49 fn clone(&self) -> Self {
50 Python::attach(|py| Self {
51 inner: self.inner.clone_ref(py),
52 expects_str: self.expects_str,
53 has_flush: self.has_flush,
54 })
55 }
56}
57
58impl PyFileLikeObject {
60 pub(crate) fn new(object: Py<PyAny>, expects_str: bool, has_flush: bool) -> Self {
64 PyFileLikeObject {
65 inner: object,
66 expects_str,
67 has_flush,
68 }
69 }
70
71 pub(crate) fn to_buffer(&self) -> Buffer<u8> {
72 Python::attach(|py| {
73 let bytes = self
74 .inner
75 .call_method(py, "read", (), None)
76 .expect("no read method found");
77
78 if let Ok(b) = bytes.cast_bound::<PyBytes>(py) {
79 let slice = b.as_bytes();
81 let owner = bytes.clone_ref(py);
82 let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
83 return Buffer::from_storage(ss);
84 }
85
86 if let Ok(b) = bytes.cast_bound::<PyString>(py) {
87 return match b.to_cow().expect("PyString is not valid UTF-8") {
88 Cow::Borrowed(v) => {
89 let slice = v.as_bytes();
91 let owner = bytes.clone_ref(py);
92 let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
93 return Buffer::from_storage(ss);
94 },
95 Cow::Owned(v) => Buffer::from_vec(v.into_bytes()),
96 };
97 }
98
99 panic!("Expecting to be able to downcast into bytes from read result.");
100 })
101 }
102
103 pub(crate) fn ensure_requirements(
107 object: &Bound<PyAny>,
108 read: bool,
109 write: bool,
110 seek: bool,
111 ) -> PyResult<()> {
112 if read && object.getattr("read").is_err() {
113 return Err(PyErr::new::<PyTypeError, _>(
114 "Object does not have a .read() method.",
115 ));
116 }
117
118 if seek && object.getattr("seek").is_err() {
119 return Err(PyErr::new::<PyTypeError, _>(
120 "Object does not have a .seek() method.",
121 ));
122 }
123
124 if write && object.getattr("write").is_err() {
125 return Err(PyErr::new::<PyTypeError, _>(
126 "Object does not have a .write() method.",
127 ));
128 }
129
130 Ok(())
131 }
132
133 pub fn flush(&self) -> std::io::Result<()> {
134 if self.has_flush {
135 Python::attach(|py| {
136 self.inner
137 .call_method(py, "flush", (), None)
138 .map_err(pyerr_to_io_err)
139 })?;
140 }
141
142 Ok(())
143 }
144}
145
146fn pyerr_to_io_err(e: PyErr) -> io::Error {
148 Python::attach(|py| {
149 let e_as_object: Py<PyAny> = e.into_py_any(py).unwrap();
150
151 match e_as_object.call_method(py, "__str__", (), None) {
152 Ok(repr) => match repr.extract::<String>(py) {
153 Ok(s) => io::Error::other(s),
154 Err(_e) => io::Error::other("An unknown error has occurred"),
155 },
156 Err(_) => io::Error::other("Err doesn't have __str__"),
157 }
158 })
159}
160
161impl Read for PyFileLikeObject {
162 fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
163 Python::attach(|py| {
164 let bytes = self
165 .inner
166 .call_method(py, "read", (buf.len(),), None)
167 .map_err(pyerr_to_io_err)?;
168
169 let opt_bytes = bytes.cast_bound::<PyBytes>(py);
170
171 if let Ok(bytes) = opt_bytes {
172 buf.write_all(bytes.as_bytes())?;
173
174 bytes.len().map_err(pyerr_to_io_err)
175 } else if let Ok(s) = bytes.cast_bound::<PyString>(py) {
176 let s = s.to_cow().map_err(pyerr_to_io_err)?;
177 buf.write_all(s.as_bytes())?;
178 Ok(s.len())
179 } else {
180 Err(io::Error::new(
181 ErrorKind::InvalidInput,
182 polars_err!(InvalidOperation: "could not read from input"),
183 ))
184 }
185 })
186 }
187}
188
189impl Write for PyFileLikeObject {
190 fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
191 let expects_str = self.expects_str;
196 let expects_str_and_is_ascii = expects_str && buf.is_ascii();
197
198 Python::attach(|py| {
199 let n_bytes = if expects_str_and_is_ascii {
200 let number_chars_written = unsafe {
201 self.inner.call_method(
202 py,
203 "write",
204 (PyString::new(py, std::str::from_utf8_unchecked(buf)),),
205 None,
206 )
207 }
208 .map_err(pyerr_to_io_err)?;
209 number_chars_written.extract(py).map_err(pyerr_to_io_err)?
210 } else if expects_str {
211 let number_chars_written = self
212 .inner
213 .call_method(
214 py,
215 "write",
216 (PyString::new(
217 py,
218 std::str::from_utf8(buf).map_err(io::Error::other)?,
219 ),),
220 None,
221 )
222 .map_err(pyerr_to_io_err)?;
223 let n_chars: usize = number_chars_written.extract(py).map_err(pyerr_to_io_err)?;
224 if n_chars > 0 {
226 std::str::from_utf8(buf)
227 .map(|str| {
228 str.char_indices()
229 .nth(n_chars - 1)
230 .map(|(i, ch)| i + ch.len_utf8())
231 .unwrap()
232 })
233 .expect("unable to parse buffer as utf-8")
234 } else {
235 0
236 }
237 } else {
238 let number_bytes_written = self
239 .inner
240 .call_method(py, "write", (PyBytes::new(py, buf),), None)
241 .map_err(pyerr_to_io_err)?;
242 number_bytes_written.extract(py).map_err(pyerr_to_io_err)?
243 };
244 Ok(n_bytes)
245 })
246 }
247
248 fn flush(&mut self) -> Result<(), io::Error> {
249 Self::flush(self)
250 }
251}
252
253impl Seek for PyFileLikeObject {
254 fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
255 Python::attach(|py| {
256 let (whence, offset) = match pos {
257 SeekFrom::Start(i) => (0, i as i64),
258 SeekFrom::Current(i) => (1, i),
259 SeekFrom::End(i) => (2, i),
260 };
261
262 let new_position = self
263 .inner
264 .call_method(py, "seek", (offset, whence), None)
265 .map_err(pyerr_to_io_err)?;
266
267 new_position.extract(py).map_err(pyerr_to_io_err)
268 })
269 }
270}
271
272pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
273
274impl FileLike for File {}
275impl FileLike for PyFileLikeObject {}
276impl MmapBytesReader for PyFileLikeObject {}
277
278pub(crate) enum EitherRustPythonFile {
279 Py(PyFileLikeObject),
280 Rust(std::fs::File),
281}
282
283impl EitherRustPythonFile {
284 pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
285 match self {
286 EitherRustPythonFile::Py(f) => Box::new(f),
287 EitherRustPythonFile::Rust(f) => Box::new(f),
288 }
289 }
290
291 fn into_scan_source_input(self) -> PythonScanSourceInput {
292 match self {
293 EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_buffer()),
294 EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
295 }
296 }
297
298 pub(crate) fn into_writeable(self) -> Writeable {
299 match self {
300 Self::Py(f) => Writeable::Dyn(Box::new(f)),
301 Self::Rust(f) => Writeable::Local(f),
302 }
303 }
304}
305
306pub(crate) enum PythonScanSourceInput {
307 Buffer(Buffer<u8>),
308 Path(PlRefPath),
309 File(std::fs::File),
310}
311
312pub(crate) fn try_get_pyfile(
313 py: Python<'_>,
314 py_f: Bound<'_, PyAny>,
315 write: bool,
316) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
317 let io = py.import("io")?;
318 let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
319 let encoding = py_f.getattr("encoding")?;
320 let encoding = encoding.extract::<Cow<str>>()?;
321 Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
322 };
323
324 #[cfg(target_family = "unix")]
325 if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
326 || (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
327 || py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
328 || py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
329 || py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
330 || (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
331 && is_utf8_encoding(&py_f)?))
332 && if write {
333 py_f.call_method0("flush").is_ok()
335 } else {
336 py_f.call_method1("seek", (0, 1)).is_ok()
338 })
339 .then(|| {
340 py_f.getattr("fileno")
341 .and_then(|fileno| fileno.call0())
342 .and_then(|fileno| fileno.extract::<libc::c_int>())
343 .ok()
344 })
345 .flatten()
346 .map(|fileno| unsafe {
347 libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
364 })
365 .filter(|fileno| *fileno != -1)
366 .map(|fileno| fileno as RawFd)
367 {
368 return Ok((
369 EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
370 fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
373 ));
374 }
375
376 let py_f = if py_f
379 .is_instance(&io.getattr("TextIOWrapper").unwrap())
380 .unwrap_or_default()
381 {
382 if !is_utf8_encoding(&py_f)? {
383 return Err(PyPolarsErr::from(
384 polars_err!(InvalidOperation: "file encoding is not UTF-8"),
385 )
386 .into());
387 }
388 if write {
391 py_f.call_method0("flush")?;
392 } else {
393 py_f.call_method1("seek", (0, 1))?;
394 }
395 py_f.getattr("buffer")?
396 } else {
397 py_f
398 };
399 PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
400 let expects_str = py_f.is_instance(&io.getattr("TextIOBase").unwrap())?;
401 let has_flush = py_f
402 .getattr_opt("flush")?
403 .is_some_and(|flush| flush.is_callable());
404 let f = PyFileLikeObject::new(py_f.unbind(), expects_str, has_flush);
405 Ok((EitherRustPythonFile::Py(f), None))
406}
407
408pub(crate) fn get_python_scan_source_input(
409 py_f: Py<PyAny>,
410 write: bool,
411) -> PyResult<PythonScanSourceInput> {
412 Python::attach(|py| {
413 let py_f = py_f.into_bound(py);
414
415 let py_f = read_if_bytesio(py_f);
420
421 if let Ok(b) = py_f.cast::<PyBytes>() {
423 let slice = b.as_bytes();
425 let owner = b.clone().unbind();
426 let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
427 let buffer = Buffer::from_storage(ss);
428 return Ok(PythonScanSourceInput::Buffer(buffer));
429 }
430
431 if let Ok(s) = py_f.extract::<Cow<str>>() {
432 let file_path = PlRefPath::try_from_path(resolve_homedir(s.as_ref()).as_ref())
433 .map_err(to_py_err)?;
434
435 Ok(PythonScanSourceInput::Path(file_path))
436 } else {
437 Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
438 }
439 })
440}
441
442fn get_either_buffer_or_path(
443 py_f: Py<PyAny>,
444 write: bool,
445) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
446 Python::attach(|py| {
447 let py_f = py_f.into_bound(py);
448 if let Ok(s) = py_f.extract::<Cow<str>>() {
449 let file_path = resolve_homedir(s.as_ref());
450 let f = if write {
451 create_file(&file_path).map_err(PyPolarsErr::from)?
452 } else {
453 polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
454 };
455 Ok((
456 EitherRustPythonFile::Rust(f.into()),
457 Some(file_path.into_owned()),
458 ))
459 } else {
460 try_get_pyfile(py, py_f, write)
461 }
462 })
463}
464
465pub(crate) fn get_either_file(py_f: Py<PyAny>, write: bool) -> PyResult<EitherRustPythonFile> {
469 Ok(get_either_buffer_or_path(py_f, write)?.0)
470}
471
472pub(crate) fn get_file_like(f: Py<PyAny>, truncate: bool) -> PyResult<Box<dyn FileLike>> {
473 Ok(get_either_file(f, truncate)?.into_dyn())
474}
475
476fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
479 let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
480 if py_f.is_instance(&bytes_io).unwrap() {
481 let Ok(bytes) = py_f.call_method0("getvalue") else {
484 return py_f;
485 };
486 return bytes;
487 }
488 py_f
489}
490
491pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
493 get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
494}
495
496pub(crate) fn get_mmap_bytes_reader_and_path(
497 py_f: &Bound<PyAny>,
498) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
499 let py_f = read_if_bytesio(py_f.clone());
500
501 if let Ok(bytes) = py_f.cast::<PyBytes>() {
503 let slice = bytes.as_bytes();
505 let owner = bytes.clone().unbind();
506 let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
507 Ok((Box::new(Cursor::new(Buffer::from_storage(ss))), None))
508 }
509 else {
511 match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
512 (EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
513 (EitherRustPythonFile::Py(f), path) => Ok((Box::new(Cursor::new(f.to_buffer())), path)),
514 }
515 }
516}