1use crate::{
2 builtins::{PyBaseExceptionRef, PyBytes, PyStr},
3 common::ascii,
4 PyObject, PyObjectRef, PyResult, VirtualMachine,
5};
6use std::{fmt, io, ops};
7
8pub trait Write {
9 type Error;
10 fn write_fmt(&mut self, args: fmt::Arguments) -> Result<(), Self::Error>;
11}
12
13#[repr(transparent)]
14pub struct IoWriter<T>(pub T);
15
16impl<T> IoWriter<T> {
17 pub fn from_ref(x: &mut T) -> &mut Self {
18 unsafe { &mut *(x as *mut T as *mut Self) }
20 }
21}
22
23impl<T> ops::Deref for IoWriter<T> {
24 type Target = T;
25 fn deref(&self) -> &T {
26 &self.0
27 }
28}
29impl<T> ops::DerefMut for IoWriter<T> {
30 fn deref_mut(&mut self) -> &mut T {
31 &mut self.0
32 }
33}
34
35impl<W> Write for IoWriter<W>
36where
37 W: io::Write,
38{
39 type Error = io::Error;
40 fn write_fmt(&mut self, args: fmt::Arguments) -> io::Result<()> {
41 <W as io::Write>::write_fmt(&mut self.0, args)
42 }
43}
44
45impl Write for String {
46 type Error = fmt::Error;
47 fn write_fmt(&mut self, args: fmt::Arguments) -> fmt::Result {
48 <String as fmt::Write>::write_fmt(self, args)
49 }
50}
51
52pub struct PyWriter<'vm>(pub PyObjectRef, pub &'vm VirtualMachine);
53
54impl Write for PyWriter<'_> {
55 type Error = PyBaseExceptionRef;
56 fn write_fmt(&mut self, args: fmt::Arguments) -> Result<(), Self::Error> {
57 let PyWriter(obj, vm) = self;
58 vm.call_method(obj, "write", (args.to_string(),)).map(drop)
59 }
60}
61
62pub fn file_readline(obj: &PyObject, size: Option<usize>, vm: &VirtualMachine) -> PyResult {
63 let args = size.map_or_else(Vec::new, |size| vec![vm.ctx.new_int(size).into()]);
64 let ret = vm.call_method(obj, "readline", args)?;
65 let eof_err = || {
66 vm.new_exception(
67 vm.ctx.exceptions.eof_error.to_owned(),
68 vec![vm.ctx.new_str(ascii!("EOF when reading a line")).into()],
69 )
70 };
71 let ret = match_class!(match ret {
72 s @ PyStr => {
73 let sval = s.as_str();
74 if sval.is_empty() {
75 return Err(eof_err());
76 }
77 if let Some(nonl) = sval.strip_suffix('\n') {
78 vm.ctx.new_str(nonl).into()
79 } else {
80 s.into()
81 }
82 }
83 b @ PyBytes => {
84 let buf = b.as_bytes();
85 if buf.is_empty() {
86 return Err(eof_err());
87 }
88 if buf.last() == Some(&b'\n') {
89 vm.ctx.new_bytes(buf[..buf.len() - 1].to_owned()).into()
90 } else {
91 b.into()
92 }
93 }
94 _ => return Err(vm.new_type_error("object.readline() returned non-string".to_owned())),
95 });
96 Ok(ret)
97}