Skip to main content

rustpython_vm/
py_io.rs

1use crate::{
2    PyObject, PyObjectRef, PyResult, VirtualMachine,
3    builtins::{PyBaseExceptionRef, PyBytes, PyStr},
4    common::ascii,
5};
6use alloc::fmt;
7use core::ops;
8use std::io;
9
10pub trait Write {
11    type Error;
12    fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> Result<(), Self::Error>;
13}
14
15#[repr(transparent)]
16pub struct IoWriter<T>(pub T);
17
18impl<T> IoWriter<T> {
19    pub fn from_ref(x: &mut T) -> &mut Self {
20        // SAFETY: IoWriter is repr(transparent) over T
21        unsafe { &mut *(x as *mut T as *mut Self) }
22    }
23}
24
25impl<T> ops::Deref for IoWriter<T> {
26    type Target = T;
27    fn deref(&self) -> &T {
28        &self.0
29    }
30}
31impl<T> ops::DerefMut for IoWriter<T> {
32    fn deref_mut(&mut self) -> &mut T {
33        &mut self.0
34    }
35}
36
37impl<W> Write for IoWriter<W>
38where
39    W: io::Write,
40{
41    type Error = io::Error;
42    fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> io::Result<()> {
43        <W as io::Write>::write_fmt(&mut self.0, args)
44    }
45}
46
47impl Write for String {
48    type Error = fmt::Error;
49    fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
50        <Self as fmt::Write>::write_fmt(self, args)
51    }
52}
53
54pub struct PyWriter<'vm>(pub PyObjectRef, pub &'vm VirtualMachine);
55
56impl Write for PyWriter<'_> {
57    type Error = PyBaseExceptionRef;
58    fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> Result<(), Self::Error> {
59        let PyWriter(obj, vm) = self;
60        vm.call_method(obj, "write", (args.to_string(),)).map(drop)
61    }
62}
63
64pub fn file_readline(obj: &PyObject, size: Option<usize>, vm: &VirtualMachine) -> PyResult {
65    let args = size.map_or_else(Vec::new, |size| vec![vm.ctx.new_int(size).into()]);
66    let ret = vm.call_method(obj, "readline", args)?;
67    let eof_err = || {
68        vm.new_exception(
69            vm.ctx.exceptions.eof_error.to_owned(),
70            vec![vm.ctx.new_str(ascii!("EOF when reading a line")).into()],
71        )
72    };
73    let ret = match_class!(match ret {
74        s @ PyStr => {
75            // Use as_wtf8() to handle strings with surrogates (e.g., surrogateescape)
76            let s_wtf8 = s.as_wtf8();
77            if s_wtf8.is_empty() {
78                return Err(eof_err());
79            }
80            // '\n' is ASCII, so we can check bytes directly
81            if s_wtf8.as_bytes().last() == Some(&b'\n') {
82                let no_nl = &s_wtf8[..s_wtf8.len() - 1];
83                vm.ctx.new_str(no_nl).into()
84            } else {
85                s.into()
86            }
87        }
88        b @ PyBytes => {
89            let buf = b.as_bytes();
90            if buf.is_empty() {
91                return Err(eof_err());
92            }
93            if buf.last() == Some(&b'\n') {
94                vm.ctx.new_bytes(buf[..buf.len() - 1].to_owned()).into()
95            } else {
96                b.into()
97            }
98        }
99        _ => return Err(vm.new_type_error("object.readline() returned non-string".to_owned())),
100    });
101    Ok(ret)
102}