pub(crate) use self::termios::module_def;
#[pymodule]
mod termios {
use crate::vm::{
PyObjectRef, PyResult, TryFromObject, VirtualMachine,
builtins::{PyBaseExceptionRef, PyBytes, PyInt, PyListRef, PyTypeRef},
common::os::ErrorExt,
convert::ToPyObject,
};
use termios::Termios;
#[cfg(any(target_os = "illumos", target_os = "solaris"))]
#[pyattr]
use libc::{CSTART, CSTOP, CSWTCH};
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
))]
#[pyattr]
use libc::{FIOASYNC, TIOCGETD, TIOCSETD};
#[pyattr]
use libc::{FIOCLEX, FIONBIO, TIOCGWINSZ, TIOCSWINSZ};
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
))]
#[pyattr]
use libc::{
FIONCLEX, FIONREAD, TIOCEXCL, TIOCM_CAR, TIOCM_CD, TIOCM_CTS, TIOCM_DSR, TIOCM_DTR,
TIOCM_LE, TIOCM_RI, TIOCM_RNG, TIOCM_RTS, TIOCM_SR, TIOCM_ST, TIOCMBIC, TIOCMBIS, TIOCMGET,
TIOCMSET, TIOCNXCL, TIOCSCTTY,
};
#[cfg(any(target_os = "android", target_os = "linux"))]
#[pyattr]
use libc::{
IBSHIFT, TCFLSH, TCGETA, TCGETS, TCSBRK, TCSETA, TCSETAF, TCSETAW, TCSETS, TCSETSF,
TCSETSW, TCXONC, TIOCGSERIAL, TIOCGSOFTCAR, TIOCINQ, TIOCLINUX, TIOCSSOFTCAR, XTABS,
};
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "linux",
target_os = "macos"
))]
#[pyattr]
use libc::{TIOCCONS, TIOCGPGRP, TIOCOUTQ, TIOCSPGRP, TIOCSTI};
#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "macos"))]
#[pyattr]
use libc::{
TIOCNOTTY, TIOCPKT, TIOCPKT_DATA, TIOCPKT_DOSTOP, TIOCPKT_FLUSHREAD, TIOCPKT_FLUSHWRITE,
TIOCPKT_NOSTOP, TIOCPKT_START, TIOCPKT_STOP,
};
#[cfg(any(
target_os = "android",
target_os = "freebsd",
target_os = "illumos",
target_os = "linux",
target_os = "macos",
target_os = "openbsd",
target_os = "solaris"
))]
#[pyattr]
use termios::os::target::TAB3;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
))]
#[pyattr]
use termios::os::target::TCSASOFT;
#[cfg(any(
target_os = "android",
target_os = "freebsd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "solaris"
))]
#[pyattr]
use termios::os::target::{B460800, B921600};
#[cfg(any(target_os = "android", target_os = "linux"))]
#[pyattr]
use termios::os::target::{
B500000, B576000, B1000000, B1152000, B1500000, B2000000, B2500000, B3000000, B3500000,
B4000000, CBAUDEX,
};
#[cfg(any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "macos",
target_os = "solaris"
))]
#[pyattr]
use termios::os::target::{
BS0, BS1, BSDLY, CR0, CR1, CR2, CR3, CRDLY, FF0, FF1, FFDLY, NL0, NL1, NLDLY, OFDEL, OFILL,
TAB1, TAB2, VT0, VT1, VTDLY,
};
#[cfg(any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "solaris"
))]
#[pyattr]
use termios::os::target::{CBAUD, CIBAUD, IUCLC, OLCUC, XCASE};
#[cfg(any(
target_os = "android",
target_os = "freebsd",
target_os = "illumos",
target_os = "linux",
target_os = "macos",
target_os = "solaris"
))]
#[pyattr]
use termios::os::target::{TAB0, TABDLY};
#[cfg(any(target_os = "android", target_os = "linux"))]
#[pyattr]
use termios::os::target::{VSWTC, VSWTC as VSWTCH};
#[cfg(any(target_os = "illumos", target_os = "solaris"))]
#[pyattr]
use termios::os::target::{VSWTCH, VSWTCH as VSWTC};
#[pyattr]
use termios::{
B0, B50, B75, B110, B134, B150, B200, B300, B600, B1200, B1800, B2400, B4800, B9600,
B19200, B38400, BRKINT, CLOCAL, CREAD, CS5, CS6, CS7, CS8, CSIZE, CSTOPB, ECHO, ECHOE,
ECHOK, ECHONL, HUPCL, ICANON, ICRNL, IEXTEN, IGNBRK, IGNCR, IGNPAR, INLCR, INPCK, ISIG,
ISTRIP, IXANY, IXOFF, IXON, NOFLSH, OCRNL, ONLCR, ONLRET, ONOCR, OPOST, PARENB, PARMRK,
PARODD, TCIFLUSH, TCIOFF, TCIOFLUSH, TCION, TCOFLUSH, TCOOFF, TCOON, TCSADRAIN, TCSAFLUSH,
TCSANOW, TOSTOP, VEOF, VEOL, VERASE, VINTR, VKILL, VMIN, VQUIT, VSTART, VSTOP, VSUSP,
VTIME,
os::target::{
B57600, B115200, B230400, CRTSCTS, ECHOCTL, ECHOKE, ECHOPRT, EXTA, EXTB, FLUSHO,
IMAXBEL, NCCS, PENDIN, VDISCARD, VEOL2, VLNEXT, VREPRINT, VWERASE,
},
};
#[pyfunction]
fn tcgetattr(fd: i32, vm: &VirtualMachine) -> PyResult<Vec<PyObjectRef>> {
let termios = Termios::from_fd(fd).map_err(|e| termios_error(e, vm))?;
let noncanon = (termios.c_lflag & termios::ICANON) == 0;
let cc = termios
.c_cc
.iter()
.enumerate()
.map(|(i, &c)| match i {
termios::VMIN | termios::VTIME if noncanon => vm.ctx.new_int(c).into(),
_ => vm.ctx.new_bytes(vec![c as _]).into(),
})
.collect::<Vec<_>>();
let out = vec![
termios.c_iflag.to_pyobject(vm),
termios.c_oflag.to_pyobject(vm),
termios.c_cflag.to_pyobject(vm),
termios.c_lflag.to_pyobject(vm),
termios::cfgetispeed(&termios).to_pyobject(vm),
termios::cfgetospeed(&termios).to_pyobject(vm),
vm.ctx.new_list(cc).into(),
];
Ok(out)
}
#[pyfunction]
fn tcsetattr(fd: i32, when: i32, attributes: PyListRef, vm: &VirtualMachine) -> PyResult<()> {
let [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] =
<&[PyObjectRef; 7]>::try_from(&*attributes.borrow_vec())
.map_err(|_| vm.new_type_error("tcsetattr, arg 3: must be 7 element list"))?
.clone();
let mut termios = Termios::from_fd(fd).map_err(|e| termios_error(e, vm))?;
termios.c_iflag = iflag.try_into_value(vm)?;
termios.c_oflag = oflag.try_into_value(vm)?;
termios.c_cflag = cflag.try_into_value(vm)?;
termios.c_lflag = lflag.try_into_value(vm)?;
termios::cfsetispeed(&mut termios, ispeed.try_into_value(vm)?)
.map_err(|e| termios_error(e, vm))?;
termios::cfsetospeed(&mut termios, ospeed.try_into_value(vm)?)
.map_err(|e| termios_error(e, vm))?;
let cc = PyListRef::try_from_object(vm, cc)?;
let cc = cc.borrow_vec();
let cc = <&[PyObjectRef; NCCS]>::try_from(&*cc).map_err(|_| {
vm.new_type_error(format!(
"tcsetattr: attributes[6] must be {NCCS} element list"
))
})?;
for (cc, x) in termios.c_cc.iter_mut().zip(cc.iter()) {
*cc = if let Some(c) = x
.downcast_ref::<PyBytes>()
.filter(|b| b.as_bytes().len() == 1)
{
c.as_bytes()[0] as _
} else if let Some(i) = x.downcast_ref::<PyInt>() {
i.try_to_primitive(vm)?
} else {
return Err(vm.new_type_error(
"tcsetattr: elements of attributes must be characters or integers",
));
};
}
termios::tcsetattr(fd, when, &termios).map_err(|e| termios_error(e, vm))?;
Ok(())
}
#[pyfunction]
fn tcsendbreak(fd: i32, duration: i32, vm: &VirtualMachine) -> PyResult<()> {
termios::tcsendbreak(fd, duration).map_err(|e| termios_error(e, vm))?;
Ok(())
}
#[pyfunction]
fn tcdrain(fd: i32, vm: &VirtualMachine) -> PyResult<()> {
termios::tcdrain(fd).map_err(|e| termios_error(e, vm))?;
Ok(())
}
#[pyfunction]
fn tcflush(fd: i32, queue: i32, vm: &VirtualMachine) -> PyResult<()> {
termios::tcflush(fd, queue).map_err(|e| termios_error(e, vm))?;
Ok(())
}
#[pyfunction]
fn tcflow(fd: i32, action: i32, vm: &VirtualMachine) -> PyResult<()> {
termios::tcflow(fd, action).map_err(|e| termios_error(e, vm))?;
Ok(())
}
fn termios_error(err: std::io::Error, vm: &VirtualMachine) -> PyBaseExceptionRef {
vm.new_os_subtype_error(
error_type(vm),
Some(err.posix_errno()),
vm.ctx.new_str(err.to_string()),
)
.upcast()
}
#[pyattr(name = "error", once)]
fn error_type(vm: &VirtualMachine) -> PyTypeRef {
vm.ctx.new_exception_type(
"termios",
"error",
Some(vec![vm.ctx.exceptions.os_error.to_owned()]),
)
}
}