pub(crate) use fcntl::module_def;
#[pymodule]
mod fcntl {
use crate::vm::{
PyResult, VirtualMachine,
builtins::PyIntRef,
function::{ArgMemoryBuffer, ArgStrOrBytesLike, Either, OptionalArg},
stdlib::_io,
};
#[pyattr]
use libc::{F_GETFD, F_GETFL, F_SETFD, F_SETFL, FD_CLOEXEC};
#[cfg(not(target_os = "wasi"))]
#[pyattr]
use libc::{F_DUPFD, F_DUPFD_CLOEXEC, F_GETLK, F_SETLK, F_SETLKW};
#[cfg(not(any(target_os = "wasi", target_os = "redox")))]
#[pyattr]
use libc::{F_GETOWN, F_RDLCK, F_SETOWN, F_UNLCK, F_WRLCK, LOCK_EX, LOCK_NB, LOCK_SH, LOCK_UN};
#[cfg(target_vendor = "apple")]
#[pyattr]
use libc::{F_FULLFSYNC, F_NOCACHE};
#[cfg(target_os = "freebsd")]
#[pyattr]
use libc::{F_DUP2FD, F_DUP2FD_CLOEXEC};
#[cfg(any(target_os = "android", target_os = "linux"))]
#[pyattr]
use libc::{F_OFD_GETLK, F_OFD_SETLK, F_OFD_SETLKW};
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
#[pyattr]
use libc::{
F_ADD_SEALS, F_GET_SEALS, F_GETLEASE, F_GETPIPE_SZ, F_NOTIFY, F_SEAL_GROW, F_SEAL_SEAL,
F_SEAL_SHRINK, F_SEAL_WRITE, F_SETLEASE, F_SETPIPE_SZ,
};
#[cfg(any(target_os = "dragonfly", target_os = "netbsd", target_vendor = "apple"))]
#[pyattr]
use libc::F_GETPATH;
#[pyfunction]
fn fcntl(
_io::Fildes(fd): _io::Fildes,
cmd: i32,
arg: OptionalArg<Either<ArgStrOrBytesLike, PyIntRef>>,
vm: &VirtualMachine,
) -> PyResult {
let int = match arg {
OptionalArg::Present(Either::A(arg)) => {
let mut buf = [0u8; 1024];
let arg_len;
{
let s = arg.borrow_bytes();
arg_len = s.len();
buf.get_mut(..arg_len)
.ok_or_else(|| vm.new_value_error("fcntl string arg too long"))?
.copy_from_slice(&s)
}
let ret = unsafe { libc::fcntl(fd, cmd, buf.as_mut_ptr()) };
if ret < 0 {
return Err(vm.new_last_errno_error());
}
return Ok(vm.ctx.new_bytes(buf[..arg_len].to_vec()).into());
}
OptionalArg::Present(Either::B(i)) => i.as_u32_mask(),
OptionalArg::Missing => 0,
};
let ret = unsafe { libc::fcntl(fd, cmd, int as i32) };
if ret < 0 {
return Err(vm.new_last_errno_error());
}
Ok(vm.new_pyobj(ret))
}
#[pyfunction]
fn ioctl(
_io::Fildes(fd): _io::Fildes,
request: i64,
arg: OptionalArg<Either<Either<ArgMemoryBuffer, ArgStrOrBytesLike>, i32>>,
mutate_flag: OptionalArg<bool>,
vm: &VirtualMachine,
) -> PyResult {
let request = (request as u32) as libc::c_ulong;
let arg = arg.unwrap_or_else(|| Either::B(0));
match arg {
Either::A(buf_kind) => {
const BUF_SIZE: usize = 1024;
let mut buf = [0u8; BUF_SIZE + 1]; let mut fill_buf = |b: &[u8]| {
if b.len() > BUF_SIZE {
return Err(vm.new_value_error("fcntl string arg too long"));
}
buf[..b.len()].copy_from_slice(b);
Ok(b.len())
};
let buf_len = match buf_kind {
Either::A(rw_arg) => {
let mutate_flag = mutate_flag.unwrap_or(true);
let mut arg_buf = rw_arg.borrow_buf_mut();
if mutate_flag {
let ret =
unsafe { libc::ioctl(fd, request as _, arg_buf.as_mut_ptr()) };
if ret < 0 {
return Err(vm.new_last_errno_error());
}
return Ok(vm.ctx.new_int(ret).into());
}
fill_buf(&arg_buf)?
}
Either::B(ro_buf) => fill_buf(&ro_buf.borrow_bytes())?,
};
let ret = unsafe { libc::ioctl(fd, request as _, buf.as_mut_ptr()) };
if ret < 0 {
return Err(vm.new_last_errno_error());
}
Ok(vm.ctx.new_bytes(buf[..buf_len].to_vec()).into())
}
Either::B(i) => {
let ret = unsafe { libc::ioctl(fd, request as _, i) };
if ret < 0 {
return Err(vm.new_last_errno_error());
}
Ok(vm.ctx.new_int(ret).into())
}
}
}
#[cfg(not(any(target_os = "wasi", target_os = "redox")))]
#[pyfunction]
fn flock(_io::Fildes(fd): _io::Fildes, operation: i32, vm: &VirtualMachine) -> PyResult {
let ret = unsafe { libc::flock(fd, operation) };
if ret < 0 {
return Err(vm.new_last_errno_error());
}
Ok(vm.ctx.new_int(ret).into())
}
#[cfg(not(any(target_os = "wasi", target_os = "redox")))]
#[pyfunction]
fn lockf(
_io::Fildes(fd): _io::Fildes,
cmd: i32,
len: OptionalArg<PyIntRef>,
start: OptionalArg<PyIntRef>,
whence: OptionalArg<i32>,
vm: &VirtualMachine,
) -> PyResult {
macro_rules! try_into_l_type {
($l_type:path) => {
$l_type
.try_into()
.map_err(|e| vm.new_overflow_error(format!("{e}")))
};
}
let mut l: libc::flock = unsafe { core::mem::zeroed() };
l.l_type = if cmd == libc::LOCK_UN {
try_into_l_type!(libc::F_UNLCK)
} else if (cmd & libc::LOCK_SH) != 0 {
try_into_l_type!(libc::F_RDLCK)
} else if (cmd & libc::LOCK_EX) != 0 {
try_into_l_type!(libc::F_WRLCK)
} else {
return Err(vm.new_value_error("unrecognized lockf argument"));
}?;
l.l_start = match start {
OptionalArg::Present(s) => s.try_to_primitive(vm)?,
OptionalArg::Missing => 0,
};
l.l_len = match len {
OptionalArg::Present(l_) => l_.try_to_primitive(vm)?,
OptionalArg::Missing => 0,
};
l.l_whence = match whence {
OptionalArg::Present(w) => w
.try_into()
.map_err(|e| vm.new_overflow_error(format!("{e}")))?,
OptionalArg::Missing => 0,
};
let ret = unsafe {
libc::fcntl(
fd,
if (cmd & libc::LOCK_NB) != 0 {
libc::F_SETLK
} else {
libc::F_SETLKW
},
&l,
)
};
if ret < 0 {
return Err(vm.new_last_errno_error());
}
Ok(vm.ctx.new_int(ret).into())
}
}