1use crate::{
2 PyObject, PyObjectRef, PyResult, VirtualMachine,
3 builtins::{PyBaseExceptionRef, PyBytes, PyStr},
4 common::ascii,
5};
6use alloc::fmt;
7use core::ops;
8use std::io;
9
10pub trait Write {
11 type Error;
12 fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> Result<(), Self::Error>;
13}
14
15#[repr(transparent)]
16pub struct IoWriter<T>(pub T);
17
18impl<T> IoWriter<T> {
19 pub fn from_ref(x: &mut T) -> &mut Self {
20 unsafe { &mut *(x as *mut T as *mut Self) }
22 }
23}
24
25impl<T> ops::Deref for IoWriter<T> {
26 type Target = T;
27 fn deref(&self) -> &T {
28 &self.0
29 }
30}
31impl<T> ops::DerefMut for IoWriter<T> {
32 fn deref_mut(&mut self) -> &mut T {
33 &mut self.0
34 }
35}
36
37impl<W> Write for IoWriter<W>
38where
39 W: io::Write,
40{
41 type Error = io::Error;
42 fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> io::Result<()> {
43 <W as io::Write>::write_fmt(&mut self.0, args)
44 }
45}
46
47impl Write for String {
48 type Error = fmt::Error;
49 fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
50 <Self as fmt::Write>::write_fmt(self, args)
51 }
52}
53
54pub struct PyWriter<'vm>(pub PyObjectRef, pub &'vm VirtualMachine);
55
56impl Write for PyWriter<'_> {
57 type Error = PyBaseExceptionRef;
58 fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> Result<(), Self::Error> {
59 let PyWriter(obj, vm) = self;
60 vm.call_method(obj, "write", (args.to_string(),)).map(drop)
61 }
62}
63
64pub fn file_readline(obj: &PyObject, size: Option<usize>, vm: &VirtualMachine) -> PyResult {
65 let args = size.map_or_else(Vec::new, |size| vec![vm.ctx.new_int(size).into()]);
66 let ret = vm.call_method(obj, "readline", args)?;
67 let eof_err = || {
68 vm.new_exception(
69 vm.ctx.exceptions.eof_error.to_owned(),
70 vec![vm.ctx.new_str(ascii!("EOF when reading a line")).into()],
71 )
72 };
73 let ret = match_class!(match ret {
74 s @ PyStr => {
75 let s_wtf8 = s.as_wtf8();
77 if s_wtf8.is_empty() {
78 return Err(eof_err());
79 }
80 if s_wtf8.as_bytes().last() == Some(&b'\n') {
82 let no_nl = &s_wtf8[..s_wtf8.len() - 1];
83 vm.ctx.new_str(no_nl).into()
84 } else {
85 s.into()
86 }
87 }
88 b @ PyBytes => {
89 let buf = b.as_bytes();
90 if buf.is_empty() {
91 return Err(eof_err());
92 }
93 if buf.last() == Some(&b'\n') {
94 vm.ctx.new_bytes(buf[..buf.len() - 1].to_owned()).into()
95 } else {
96 b.into()
97 }
98 }
99 _ => return Err(vm.new_type_error("object.readline() returned non-string".to_owned())),
100 });
101 Ok(ret)
102}