rustpython-stdlib 0.5.0

RustPython standard libraries in Rust.
Documentation
// spell-checker:disable

pub(crate) use self::termios::module_def;

#[pymodule]
mod termios {
    use crate::vm::{
        PyObjectRef, PyResult, TryFromObject, VirtualMachine,
        builtins::{PyBaseExceptionRef, PyBytes, PyInt, PyListRef, PyTypeRef},
        common::os::ErrorExt,
        convert::ToPyObject,
    };
    use termios::Termios;

    // TODO: more ioctl numbers
    // NOTE: B2500000, B3000000, B3500000, B4000000, and CIBAUD
    //       are only available on specific architectures
    // TODO: supply these from <sys/ttydefaults.h> (please file an issue/PR upstream):
    //       CDEL, CDSUSP, CEOF, CEOL, CEOL2, CEOT, CERASE, CESC, CFLUSH, CINTR, CKILL, CLNEXT,
    //       CNUL, COMMON, CQUIT, CRPRNT, CSTART, CSTOP, CSUSP, CSWTCH, CWERASE
    // NOTE: the constant INIT_C_CC is most likely missing nowadays
    // TODO: supply these from <asm-generic/ioctl.h> (please file an issue/PR upstream):
    //       IOCSIZE_MASK, IOCSIZE_SHIFT
    // TODO: supply NCC from <asm-generic/termios.h> (please file an issue/PR upstream)
    // NOTE: I have only found NSWTCH on cygwin, so please alert the RustPython maintainers if it
    //       is present on your system
    // TODO: supply these from <bits/ioctl-types.h> or <linux/tty.h> (please file an issue/PR
    //       upstream):
    //       N_MOUSE, N_PPP, N_SLIP, N_STRIP, N_TTY
    // NOTE: some of these have incomplete coverage in rust libc (please file an issue/PR upstream)
    // TODO: possibly supply these from <asm-generic/ioctls.h> (please file an issue/PR upstream):
    //       TCSBRKP, TIOCGICOUNT, TIOCGLCKTRMIOS, TIOCSERCONFIG, TIOCSERGETLSR, TIOCSERGETMULTI,
    //       TIOCSERGSTRUCT, TIOCSERGWILD, TIOCSERSETMULTI, TIOCSERSWILD, TIOCSER_TEMT,
    //       TIOCSLCKTRMIOS, TIOCSSERIAL, TIOCTTYGSTRUCT
    #[cfg(any(target_os = "illumos", target_os = "solaris"))]
    #[pyattr]
    use libc::{CSTART, CSTOP, CSWTCH};
    #[cfg(any(
        target_os = "dragonfly",
        target_os = "freebsd",
        target_os = "macos",
        target_os = "netbsd",
        target_os = "openbsd"
    ))]
    #[pyattr]
    use libc::{FIOASYNC, TIOCGETD, TIOCSETD};
    #[pyattr]
    use libc::{FIOCLEX, FIONBIO, TIOCGWINSZ, TIOCSWINSZ};
    #[cfg(any(
        target_os = "android",
        target_os = "dragonfly",
        target_os = "freebsd",
        target_os = "linux",
        target_os = "macos",
        target_os = "netbsd",
        target_os = "openbsd"
    ))]
    #[pyattr]
    use libc::{
        FIONCLEX, FIONREAD, TIOCEXCL, TIOCM_CAR, TIOCM_CD, TIOCM_CTS, TIOCM_DSR, TIOCM_DTR,
        TIOCM_LE, TIOCM_RI, TIOCM_RNG, TIOCM_RTS, TIOCM_SR, TIOCM_ST, TIOCMBIC, TIOCMBIS, TIOCMGET,
        TIOCMSET, TIOCNXCL, TIOCSCTTY,
    };
    #[cfg(any(target_os = "android", target_os = "linux"))]
    #[pyattr]
    use libc::{
        IBSHIFT, TCFLSH, TCGETA, TCGETS, TCSBRK, TCSETA, TCSETAF, TCSETAW, TCSETS, TCSETSF,
        TCSETSW, TCXONC, TIOCGSERIAL, TIOCGSOFTCAR, TIOCINQ, TIOCLINUX, TIOCSSOFTCAR, XTABS,
    };
    #[cfg(any(
        target_os = "android",
        target_os = "dragonfly",
        target_os = "freebsd",
        target_os = "linux",
        target_os = "macos"
    ))]
    #[pyattr]
    use libc::{TIOCCONS, TIOCGPGRP, TIOCOUTQ, TIOCSPGRP, TIOCSTI};
    #[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "macos"))]
    #[pyattr]
    use libc::{
        TIOCNOTTY, TIOCPKT, TIOCPKT_DATA, TIOCPKT_DOSTOP, TIOCPKT_FLUSHREAD, TIOCPKT_FLUSHWRITE,
        TIOCPKT_NOSTOP, TIOCPKT_START, TIOCPKT_STOP,
    };
    #[cfg(any(
        target_os = "android",
        target_os = "freebsd",
        target_os = "illumos",
        target_os = "linux",
        target_os = "macos",
        target_os = "openbsd",
        target_os = "solaris"
    ))]
    #[pyattr]
    use termios::os::target::TAB3;
    #[cfg(any(
        target_os = "dragonfly",
        target_os = "freebsd",
        target_os = "macos",
        target_os = "netbsd",
        target_os = "openbsd"
    ))]
    #[pyattr]
    use termios::os::target::TCSASOFT;
    #[cfg(any(
        target_os = "android",
        target_os = "freebsd",
        target_os = "illumos",
        target_os = "linux",
        target_os = "netbsd",
        target_os = "solaris"
    ))]
    #[pyattr]
    use termios::os::target::{B460800, B921600};
    #[cfg(any(target_os = "android", target_os = "linux"))]
    #[pyattr]
    use termios::os::target::{
        B500000, B576000, B1000000, B1152000, B1500000, B2000000, B2500000, B3000000, B3500000,
        B4000000, CBAUDEX,
    };
    #[cfg(any(
        target_os = "android",
        target_os = "illumos",
        target_os = "linux",
        target_os = "macos",
        target_os = "solaris"
    ))]
    #[pyattr]
    use termios::os::target::{
        BS0, BS1, BSDLY, CR0, CR1, CR2, CR3, CRDLY, FF0, FF1, FFDLY, NL0, NL1, NLDLY, OFDEL, OFILL,
        TAB1, TAB2, VT0, VT1, VTDLY,
    };
    #[cfg(any(
        target_os = "android",
        target_os = "illumos",
        target_os = "linux",
        target_os = "solaris"
    ))]
    #[pyattr]
    use termios::os::target::{CBAUD, CIBAUD, IUCLC, OLCUC, XCASE};
    #[cfg(any(
        target_os = "android",
        target_os = "freebsd",
        target_os = "illumos",
        target_os = "linux",
        target_os = "macos",
        target_os = "solaris"
    ))]
    #[pyattr]
    use termios::os::target::{TAB0, TABDLY};
    #[cfg(any(target_os = "android", target_os = "linux"))]
    #[pyattr]
    use termios::os::target::{VSWTC, VSWTC as VSWTCH};
    #[cfg(any(target_os = "illumos", target_os = "solaris"))]
    #[pyattr]
    use termios::os::target::{VSWTCH, VSWTCH as VSWTC};
    #[pyattr]
    use termios::{
        B0, B50, B75, B110, B134, B150, B200, B300, B600, B1200, B1800, B2400, B4800, B9600,
        B19200, B38400, BRKINT, CLOCAL, CREAD, CS5, CS6, CS7, CS8, CSIZE, CSTOPB, ECHO, ECHOE,
        ECHOK, ECHONL, HUPCL, ICANON, ICRNL, IEXTEN, IGNBRK, IGNCR, IGNPAR, INLCR, INPCK, ISIG,
        ISTRIP, IXANY, IXOFF, IXON, NOFLSH, OCRNL, ONLCR, ONLRET, ONOCR, OPOST, PARENB, PARMRK,
        PARODD, TCIFLUSH, TCIOFF, TCIOFLUSH, TCION, TCOFLUSH, TCOOFF, TCOON, TCSADRAIN, TCSAFLUSH,
        TCSANOW, TOSTOP, VEOF, VEOL, VERASE, VINTR, VKILL, VMIN, VQUIT, VSTART, VSTOP, VSUSP,
        VTIME,
        os::target::{
            B57600, B115200, B230400, CRTSCTS, ECHOCTL, ECHOKE, ECHOPRT, EXTA, EXTB, FLUSHO,
            IMAXBEL, NCCS, PENDIN, VDISCARD, VEOL2, VLNEXT, VREPRINT, VWERASE,
        },
    };

    #[pyfunction]
    fn tcgetattr(fd: i32, vm: &VirtualMachine) -> PyResult<Vec<PyObjectRef>> {
        let termios = Termios::from_fd(fd).map_err(|e| termios_error(e, vm))?;
        let noncanon = (termios.c_lflag & termios::ICANON) == 0;
        let cc = termios
            .c_cc
            .iter()
            .enumerate()
            .map(|(i, &c)| match i {
                termios::VMIN | termios::VTIME if noncanon => vm.ctx.new_int(c).into(),
                _ => vm.ctx.new_bytes(vec![c as _]).into(),
            })
            .collect::<Vec<_>>();
        let out = vec![
            termios.c_iflag.to_pyobject(vm),
            termios.c_oflag.to_pyobject(vm),
            termios.c_cflag.to_pyobject(vm),
            termios.c_lflag.to_pyobject(vm),
            termios::cfgetispeed(&termios).to_pyobject(vm),
            termios::cfgetospeed(&termios).to_pyobject(vm),
            vm.ctx.new_list(cc).into(),
        ];
        Ok(out)
    }

    #[pyfunction]
    fn tcsetattr(fd: i32, when: i32, attributes: PyListRef, vm: &VirtualMachine) -> PyResult<()> {
        let [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] =
            <&[PyObjectRef; 7]>::try_from(&*attributes.borrow_vec())
                .map_err(|_| vm.new_type_error("tcsetattr, arg 3: must be 7 element list"))?
                .clone();
        let mut termios = Termios::from_fd(fd).map_err(|e| termios_error(e, vm))?;
        termios.c_iflag = iflag.try_into_value(vm)?;
        termios.c_oflag = oflag.try_into_value(vm)?;
        termios.c_cflag = cflag.try_into_value(vm)?;
        termios.c_lflag = lflag.try_into_value(vm)?;
        termios::cfsetispeed(&mut termios, ispeed.try_into_value(vm)?)
            .map_err(|e| termios_error(e, vm))?;
        termios::cfsetospeed(&mut termios, ospeed.try_into_value(vm)?)
            .map_err(|e| termios_error(e, vm))?;
        let cc = PyListRef::try_from_object(vm, cc)?;
        let cc = cc.borrow_vec();
        let cc = <&[PyObjectRef; NCCS]>::try_from(&*cc).map_err(|_| {
            vm.new_type_error(format!(
                "tcsetattr: attributes[6] must be {NCCS} element list"
            ))
        })?;
        for (cc, x) in termios.c_cc.iter_mut().zip(cc.iter()) {
            *cc = if let Some(c) = x
                .downcast_ref::<PyBytes>()
                .filter(|b| b.as_bytes().len() == 1)
            {
                c.as_bytes()[0] as _
            } else if let Some(i) = x.downcast_ref::<PyInt>() {
                i.try_to_primitive(vm)?
            } else {
                return Err(vm.new_type_error(
                    "tcsetattr: elements of attributes must be characters or integers",
                ));
            };
        }

        termios::tcsetattr(fd, when, &termios).map_err(|e| termios_error(e, vm))?;

        Ok(())
    }

    #[pyfunction]
    fn tcsendbreak(fd: i32, duration: i32, vm: &VirtualMachine) -> PyResult<()> {
        termios::tcsendbreak(fd, duration).map_err(|e| termios_error(e, vm))?;
        Ok(())
    }

    #[pyfunction]
    fn tcdrain(fd: i32, vm: &VirtualMachine) -> PyResult<()> {
        termios::tcdrain(fd).map_err(|e| termios_error(e, vm))?;
        Ok(())
    }

    #[pyfunction]
    fn tcflush(fd: i32, queue: i32, vm: &VirtualMachine) -> PyResult<()> {
        termios::tcflush(fd, queue).map_err(|e| termios_error(e, vm))?;
        Ok(())
    }

    #[pyfunction]
    fn tcflow(fd: i32, action: i32, vm: &VirtualMachine) -> PyResult<()> {
        termios::tcflow(fd, action).map_err(|e| termios_error(e, vm))?;
        Ok(())
    }

    fn termios_error(err: std::io::Error, vm: &VirtualMachine) -> PyBaseExceptionRef {
        vm.new_os_subtype_error(
            error_type(vm),
            Some(err.posix_errno()),
            vm.ctx.new_str(err.to_string()),
        )
        .upcast()
    }

    #[pyattr(name = "error", once)]
    fn error_type(vm: &VirtualMachine) -> PyTypeRef {
        vm.ctx.new_exception_type(
            "termios",
            "error",
            Some(vec![vm.ctx.exceptions.os_error.to_owned()]),
        )
    }
}