Skip to main content

rustpython_vm/stdlib/
os.rs

1// spell-checker:disable
2
3use crate::{
4    AsObject, Py, PyObjectRef, PyPayload, PyResult, TryFromObject, VirtualMachine,
5    builtins::{PyModule, PySet},
6    common::crt_fd,
7    convert::{IntoPyException, ToPyException, ToPyObject},
8    function::{ArgumentError, FromArgs, FuncArgs},
9};
10use std::{fs, io, path::Path};
11
12pub(crate) fn fs_metadata<P: AsRef<Path>>(
13    path: P,
14    follow_symlink: bool,
15) -> io::Result<fs::Metadata> {
16    if follow_symlink {
17        fs::metadata(path.as_ref())
18    } else {
19        fs::symlink_metadata(path.as_ref())
20    }
21}
22
23#[allow(dead_code)]
24#[derive(FromArgs, Default)]
25pub struct TargetIsDirectory {
26    #[pyarg(any, default = false)]
27    pub(crate) target_is_directory: bool,
28}
29
30cfg_if::cfg_if! {
31    if #[cfg(all(any(unix, target_os = "wasi"), not(target_os = "redox")))] {
32        use libc::AT_FDCWD;
33    } else {
34        const AT_FDCWD: i32 = -100;
35    }
36}
37const DEFAULT_DIR_FD: crt_fd::Borrowed<'static> = unsafe { crt_fd::Borrowed::borrow_raw(AT_FDCWD) };
38
39// XXX: AVAILABLE should be a bool, but we can't yet have it as a bool and just cast it to usize
40#[derive(Copy, Clone, PartialEq, Eq)]
41pub struct DirFd<'fd, const AVAILABLE: usize>(pub(crate) [crt_fd::Borrowed<'fd>; AVAILABLE]);
42
43impl<const AVAILABLE: usize> Default for DirFd<'_, AVAILABLE> {
44    fn default() -> Self {
45        Self([DEFAULT_DIR_FD; AVAILABLE])
46    }
47}
48
49// not used on all platforms
50#[allow(unused)]
51impl<'fd> DirFd<'fd, 1> {
52    #[inline(always)]
53    pub(crate) fn get_opt(self) -> Option<crt_fd::Borrowed<'fd>> {
54        let [fd] = self.0;
55        (fd != DEFAULT_DIR_FD).then_some(fd)
56    }
57
58    #[inline]
59    pub(crate) fn raw_opt(self) -> Option<i32> {
60        self.get_opt().map(|fd| fd.as_raw())
61    }
62
63    #[inline(always)]
64    pub(crate) const fn get(self) -> crt_fd::Borrowed<'fd> {
65        let [fd] = self.0;
66        fd
67    }
68}
69
70impl<const AVAILABLE: usize> FromArgs for DirFd<'_, AVAILABLE> {
71    fn from_args(vm: &VirtualMachine, args: &mut FuncArgs) -> Result<Self, ArgumentError> {
72        let fd = match args.take_keyword("dir_fd") {
73            Some(o) if vm.is_none(&o) => Ok(DEFAULT_DIR_FD),
74            None => Ok(DEFAULT_DIR_FD),
75            Some(o) => {
76                warn_if_bool_fd(&o, vm).map_err(Into::<ArgumentError>::into)?;
77                let fd = o.try_index_opt(vm).unwrap_or_else(|| {
78                    Err(vm.new_type_error(format!(
79                        "argument should be integer or None, not {}",
80                        o.class().name()
81                    )))
82                })?;
83                let fd = fd.try_to_primitive(vm)?;
84                unsafe { crt_fd::Borrowed::try_borrow_raw(fd) }
85            }
86        };
87        if AVAILABLE == 0 && fd.as_ref().is_ok_and(|&fd| fd != DEFAULT_DIR_FD) {
88            return Err(vm
89                .new_not_implemented_error("dir_fd unavailable on this platform")
90                .into());
91        }
92        let fd = fd.map_err(|e| e.to_pyexception(vm))?;
93        Ok(Self([fd; AVAILABLE]))
94    }
95}
96
97#[derive(FromArgs)]
98pub(super) struct FollowSymlinks(
99    #[pyarg(named, name = "follow_symlinks", default = true)] pub bool,
100);
101
102#[cfg(not(windows))]
103fn bytes_as_os_str<'a>(b: &'a [u8], vm: &VirtualMachine) -> PyResult<&'a std::ffi::OsStr> {
104    rustpython_common::os::bytes_as_os_str(b)
105        .map_err(|_| vm.new_unicode_decode_error("can't decode path for utf-8"))
106}
107
108pub(crate) fn warn_if_bool_fd(obj: &PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
109    use crate::class::StaticType;
110    if obj
111        .class()
112        .is(crate::builtins::bool_::PyBool::static_type())
113    {
114        crate::stdlib::_warnings::warn(
115            vm.ctx.exceptions.runtime_warning,
116            "bool is used as a file descriptor".to_owned(),
117            1,
118            vm,
119        )?;
120    }
121    Ok(())
122}
123
124impl TryFromObject for crt_fd::Owned {
125    fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
126        warn_if_bool_fd(&obj, vm)?;
127        let fd = crt_fd::Raw::try_from_object(vm, obj)?;
128        unsafe { crt_fd::Owned::try_from_raw(fd) }.map_err(|e| e.into_pyexception(vm))
129    }
130}
131
132impl TryFromObject for crt_fd::Borrowed<'_> {
133    fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
134        warn_if_bool_fd(&obj, vm)?;
135        let fd = crt_fd::Raw::try_from_object(vm, obj)?;
136        unsafe { crt_fd::Borrowed::try_borrow_raw(fd) }.map_err(|e| e.into_pyexception(vm))
137    }
138}
139
140impl ToPyObject for crt_fd::Owned {
141    fn to_pyobject(self, vm: &VirtualMachine) -> PyObjectRef {
142        self.into_raw().to_pyobject(vm)
143    }
144}
145
146impl ToPyObject for crt_fd::Borrowed<'_> {
147    fn to_pyobject(self, vm: &VirtualMachine) -> PyObjectRef {
148        self.as_raw().to_pyobject(vm)
149    }
150}
151
152#[pymodule(sub)]
153pub(super) mod _os {
154    use super::{DirFd, FollowSymlinks, SupportFunc};
155    #[cfg(windows)]
156    use crate::common::windows::ToWideString;
157    #[cfg(any(unix, windows))]
158    use crate::utils::ToCString;
159    use crate::{
160        AsObject, Py, PyObjectRef, PyPayload, PyRef, PyResult, TryFromObject,
161        builtins::{
162            PyBytesRef, PyGenericAlias, PyIntRef, PyStrRef, PyTuple, PyTupleRef, PyTypeRef,
163        },
164        common::{
165            crt_fd,
166            fileutils::StatStruct,
167            lock::{OnceCell, PyRwLock},
168            suppress_iph,
169        },
170        convert::{IntoPyException, ToPyObject},
171        exceptions::{OSErrorBuilder, ToOSErrorBuilder},
172        function::{ArgBytesLike, ArgMemoryBuffer, FsPath, FuncArgs, OptionalArg},
173        ospath::{OsPath, OsPathOrFd, OutputMode, PathConverter},
174        protocol::PyIterReturn,
175        recursion::ReprGuard,
176        types::{Destructor, IterNext, Iterable, PyStructSequence, Representable, SelfIter},
177        vm::VirtualMachine,
178    };
179    use core::time::Duration;
180    use crossbeam_utils::atomic::AtomicCell;
181    use rustpython_common::wtf8::Wtf8Buf;
182    use std::{env, fs, fs::OpenOptions, io, path::PathBuf, time::SystemTime};
183
184    const OPEN_DIR_FD: bool = cfg!(not(any(windows, target_os = "redox")));
185    pub(crate) const MKDIR_DIR_FD: bool = cfg!(not(any(windows, target_os = "redox")));
186    const STAT_DIR_FD: bool = cfg!(not(any(windows, target_os = "redox")));
187    const UTIME_DIR_FD: bool = cfg!(not(any(windows, target_os = "redox")));
188    pub(crate) const SYMLINK_DIR_FD: bool = cfg!(not(any(windows, target_os = "redox")));
189    pub(crate) const UNLINK_DIR_FD: bool = cfg!(not(any(windows, target_os = "redox")));
190    const RMDIR_DIR_FD: bool = cfg!(not(any(windows, target_os = "redox")));
191    const SCANDIR_FD: bool = cfg!(all(unix, not(target_os = "redox")));
192
193    #[pyattr]
194    use libc::{O_APPEND, O_CREAT, O_EXCL, O_RDONLY, O_RDWR, O_TRUNC, O_WRONLY};
195
196    #[pyattr]
197    pub(crate) const F_OK: u8 = 0;
198    #[pyattr]
199    pub(crate) const R_OK: u8 = 1 << 2;
200    #[pyattr]
201    pub(crate) const W_OK: u8 = 1 << 1;
202    #[pyattr]
203    pub(crate) const X_OK: u8 = 1 << 0;
204
205    // ST_RDONLY and ST_NOSUID flags for statvfs
206    #[cfg(all(unix, not(target_os = "redox")))]
207    #[pyattr]
208    const ST_RDONLY: libc::c_ulong = libc::ST_RDONLY;
209
210    #[cfg(all(unix, not(target_os = "redox")))]
211    #[pyattr]
212    const ST_NOSUID: libc::c_ulong = libc::ST_NOSUID;
213
214    #[pyfunction]
215    fn close(fileno: crt_fd::Owned) -> io::Result<()> {
216        crt_fd::close(fileno)
217    }
218
219    #[pyfunction]
220    fn closerange(fd_low: i32, fd_high: i32) {
221        for fileno in fd_low..fd_high {
222            if let Ok(fd) = unsafe { crt_fd::Owned::try_from_raw(fileno) } {
223                drop(fd);
224            }
225        }
226    }
227
228    #[cfg(any(unix, windows, target_os = "wasi"))]
229    #[derive(FromArgs)]
230    struct OpenArgs<'fd> {
231        path: OsPath,
232        flags: i32,
233        #[pyarg(any, default)]
234        mode: Option<i32>,
235        #[pyarg(flatten)]
236        dir_fd: DirFd<'fd, { OPEN_DIR_FD as usize }>,
237    }
238
239    #[pyfunction]
240    fn open(args: OpenArgs<'_>, vm: &VirtualMachine) -> PyResult<crt_fd::Owned> {
241        os_open(args.path, args.flags, args.mode, args.dir_fd, vm)
242    }
243
244    #[cfg(any(unix, windows, target_os = "wasi"))]
245    pub(crate) fn os_open(
246        name: OsPath,
247        flags: i32,
248        mode: Option<i32>,
249        dir_fd: DirFd<'_, { OPEN_DIR_FD as usize }>,
250        vm: &VirtualMachine,
251    ) -> PyResult<crt_fd::Owned> {
252        let mode = mode.unwrap_or(0o777);
253        #[cfg(windows)]
254        let fd = {
255            let [] = dir_fd.0;
256            let name = name.to_wide_cstring(vm)?;
257            let flags = flags | libc::O_NOINHERIT;
258            crt_fd::wopen(&name, flags, mode)
259        };
260        #[cfg(not(windows))]
261        let fd = {
262            let name = name.clone().into_cstring(vm)?;
263            #[cfg(not(target_os = "wasi"))]
264            let flags = flags | libc::O_CLOEXEC;
265            #[cfg(not(target_os = "redox"))]
266            if let Some(dir_fd) = dir_fd.get_opt() {
267                crt_fd::openat(dir_fd, &name, flags, mode)
268            } else {
269                crt_fd::open(&name, flags, mode)
270            }
271            #[cfg(target_os = "redox")]
272            {
273                let [] = dir_fd.0;
274                crt_fd::open(&name, flags, mode)
275            }
276        };
277        fd.map_err(|err| OSErrorBuilder::with_filename_from_errno(&err, name, vm))
278    }
279
280    #[pyfunction]
281    fn fsync(fd: crt_fd::Borrowed<'_>) -> io::Result<()> {
282        crt_fd::fsync(fd)
283    }
284
285    #[pyfunction]
286    fn read(fd: crt_fd::Borrowed<'_>, n: usize, vm: &VirtualMachine) -> PyResult<PyBytesRef> {
287        let mut buffer = vec![0u8; n];
288        loop {
289            match vm.allow_threads(|| crt_fd::read(fd, &mut buffer)) {
290                Ok(n) => {
291                    buffer.truncate(n);
292                    return Ok(vm.ctx.new_bytes(buffer));
293                }
294                Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
295                    vm.check_signals()?;
296                    continue;
297                }
298                Err(e) => return Err(e.into_pyexception(vm)),
299            }
300        }
301    }
302
303    #[pyfunction]
304    fn readinto(
305        fd: crt_fd::Borrowed<'_>,
306        buffer: ArgMemoryBuffer,
307        vm: &VirtualMachine,
308    ) -> PyResult<usize> {
309        buffer.with_ref(|buf| {
310            loop {
311                match vm.allow_threads(|| crt_fd::read(fd, buf)) {
312                    Ok(n) => return Ok(n),
313                    Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
314                        vm.check_signals()?;
315                        continue;
316                    }
317                    Err(e) => return Err(e.into_pyexception(vm)),
318                }
319            }
320        })
321    }
322
323    #[pyfunction]
324    fn write(fd: crt_fd::Borrowed<'_>, data: ArgBytesLike, vm: &VirtualMachine) -> PyResult<usize> {
325        data.with_ref(|b| {
326            loop {
327                match vm.allow_threads(|| crt_fd::write(fd, b)) {
328                    Ok(n) => return Ok(n),
329                    Err(e) if e.raw_os_error() == Some(libc::EINTR) => {
330                        vm.check_signals()?;
331                        continue;
332                    }
333                    Err(e) => return Err(e.into_pyexception(vm)),
334                }
335            }
336        })
337    }
338
339    #[cfg(not(windows))]
340    #[pyfunction]
341    fn mkdir(
342        path: OsPath,
343        mode: OptionalArg<i32>,
344        dir_fd: DirFd<'_, { MKDIR_DIR_FD as usize }>,
345        vm: &VirtualMachine,
346    ) -> PyResult<()> {
347        let mode = mode.unwrap_or(0o777);
348        let c_path = path.clone().into_cstring(vm)?;
349        #[cfg(not(target_os = "redox"))]
350        if let Some(fd) = dir_fd.raw_opt() {
351            let res = unsafe { libc::mkdirat(fd, c_path.as_ptr(), mode as _) };
352            return if res < 0 {
353                let err = crate::common::os::errno_io_error();
354                Err(OSErrorBuilder::with_filename(&err, path, vm))
355            } else {
356                Ok(())
357            };
358        }
359        #[cfg(target_os = "redox")]
360        let [] = dir_fd.0;
361        let res = unsafe { libc::mkdir(c_path.as_ptr(), mode as _) };
362        if res < 0 {
363            let err = crate::common::os::errno_io_error();
364            return Err(OSErrorBuilder::with_filename(&err, path, vm));
365        }
366        Ok(())
367    }
368
369    #[pyfunction]
370    fn mkdirs(path: PyStrRef, vm: &VirtualMachine) -> PyResult<()> {
371        let os_path = vm.fsencode(&path)?;
372        fs::create_dir_all(&*os_path).map_err(|err| err.into_pyexception(vm))
373    }
374
375    #[cfg(not(windows))]
376    #[pyfunction]
377    fn rmdir(
378        path: OsPath,
379        dir_fd: DirFd<'_, { RMDIR_DIR_FD as usize }>,
380        vm: &VirtualMachine,
381    ) -> PyResult<()> {
382        #[cfg(not(target_os = "redox"))]
383        if let Some(fd) = dir_fd.raw_opt() {
384            let c_path = path.clone().into_cstring(vm)?;
385            let res = unsafe { libc::unlinkat(fd, c_path.as_ptr(), libc::AT_REMOVEDIR) };
386            return if res < 0 {
387                let err = crate::common::os::errno_io_error();
388                Err(OSErrorBuilder::with_filename(&err, path, vm))
389            } else {
390                Ok(())
391            };
392        }
393        #[cfg(target_os = "redox")]
394        let [] = dir_fd.0;
395        fs::remove_dir(&path).map_err(|err| OSErrorBuilder::with_filename(&err, path, vm))
396    }
397
398    #[cfg(windows)]
399    #[pyfunction]
400    fn rmdir(path: OsPath, dir_fd: DirFd<'_, 0>, vm: &VirtualMachine) -> PyResult<()> {
401        let [] = dir_fd.0;
402        fs::remove_dir(&path).map_err(|err| OSErrorBuilder::with_filename(&err, path, vm))
403    }
404
405    const LISTDIR_FD: bool = cfg!(all(unix, not(target_os = "redox")));
406
407    #[pyfunction]
408    fn listdir(
409        path: OptionalArg<Option<OsPathOrFd<'_>>>,
410        vm: &VirtualMachine,
411    ) -> PyResult<Vec<PyObjectRef>> {
412        let path = path
413            .flatten()
414            .unwrap_or_else(|| OsPathOrFd::Path(OsPath::new_str(".")));
415        let list = match path {
416            OsPathOrFd::Path(path) => {
417                let dir_iter = match fs::read_dir(&path) {
418                    Ok(iter) => iter,
419                    Err(err) => {
420                        return Err(OSErrorBuilder::with_filename(&err, path, vm));
421                    }
422                };
423                let mode = path.mode();
424                dir_iter
425                    .map(|entry| match entry {
426                        Ok(entry_path) => Ok(mode.process_path(entry_path.file_name(), vm)),
427                        Err(err) => Err(OSErrorBuilder::with_filename(&err, path.clone(), vm)),
428                    })
429                    .collect::<PyResult<_>>()?
430            }
431            OsPathOrFd::Fd(fno) => {
432                #[cfg(not(all(unix, not(target_os = "redox"))))]
433                {
434                    let _ = fno;
435                    return Err(
436                        vm.new_not_implemented_error("can't pass fd to listdir on this platform")
437                    );
438                }
439                #[cfg(all(unix, not(target_os = "redox")))]
440                {
441                    use rustpython_common::os::ffi::OsStrExt;
442                    use std::os::unix::io::IntoRawFd;
443                    let new_fd = nix::unistd::dup(fno).map_err(|e| e.into_pyexception(vm))?;
444                    let raw_fd = new_fd.into_raw_fd();
445                    let dir = OwnedDir::from_fd(raw_fd).map_err(|e| {
446                        unsafe { libc::close(raw_fd) };
447                        e.into_pyexception(vm)
448                    })?;
449                    // OwnedDir::drop calls rewinddir (reset to start) then closedir.
450                    let mut list = Vec::new();
451                    loop {
452                        nix::errno::Errno::clear();
453                        let entry = unsafe { libc::readdir(dir.as_ptr()) };
454                        if entry.is_null() {
455                            let err = nix::errno::Errno::last();
456                            if err != nix::errno::Errno::UnknownErrno {
457                                return Err(io::Error::from(err).into_pyexception(vm));
458                            }
459                            break;
460                        }
461                        let fname = unsafe { core::ffi::CStr::from_ptr((*entry).d_name.as_ptr()) }
462                            .to_bytes();
463                        match fname {
464                            b"." | b".." => continue,
465                            _ => list.push(
466                                OutputMode::String
467                                    .process_path(std::ffi::OsStr::from_bytes(fname), vm),
468                            ),
469                        }
470                    }
471                    list
472                }
473            }
474        };
475        Ok(list)
476    }
477
478    #[cfg(not(windows))]
479    fn env_bytes_as_bytes(obj: &crate::function::Either<PyStrRef, PyBytesRef>) -> &[u8] {
480        match obj {
481            crate::function::Either::A(s) => s.as_bytes(),
482            crate::function::Either::B(b) => b.as_bytes(),
483        }
484    }
485
486    #[cfg(windows)]
487    unsafe extern "C" {
488        fn _wputenv(envstring: *const u16) -> libc::c_int;
489    }
490
491    /// Check if wide string length exceeds Windows environment variable limit.
492    #[cfg(windows)]
493    fn check_env_var_len(wide_len: usize, vm: &VirtualMachine) -> PyResult<()> {
494        use crate::common::windows::_MAX_ENV;
495        if wide_len > _MAX_ENV + 1 {
496            return Err(vm.new_value_error(format!(
497                "the environment variable is longer than {_MAX_ENV} characters",
498            )));
499        }
500        Ok(())
501    }
502
503    #[cfg(windows)]
504    #[pyfunction]
505    fn putenv(key: PyStrRef, value: PyStrRef, vm: &VirtualMachine) -> PyResult<()> {
506        let key_str = key.expect_str();
507        let value_str = value.expect_str();
508        // Search from index 1 because on Windows starting '=' is allowed for
509        // defining hidden environment variables.
510        if key_str.is_empty()
511            || key_str.get(1..).is_some_and(|s| s.contains('='))
512            || key_str.contains('\0')
513            || value_str.contains('\0')
514        {
515            return Err(vm.new_value_error("illegal environment variable name"));
516        }
517        let env_str = format!("{}={}", key_str, value_str);
518        let wide = env_str.to_wide_with_nul();
519        check_env_var_len(wide.len(), vm)?;
520
521        // Use _wputenv like CPython (not SetEnvironmentVariableW) to update CRT environ
522        let result = unsafe { suppress_iph!(_wputenv(wide.as_ptr())) };
523        if result != 0 {
524            return Err(vm.new_last_errno_error());
525        }
526        Ok(())
527    }
528
529    #[cfg(not(windows))]
530    #[pyfunction]
531    fn putenv(
532        key: crate::function::Either<PyStrRef, PyBytesRef>,
533        value: crate::function::Either<PyStrRef, PyBytesRef>,
534        vm: &VirtualMachine,
535    ) -> PyResult<()> {
536        let key = env_bytes_as_bytes(&key);
537        let value = env_bytes_as_bytes(&value);
538        if key.contains(&b'\0') || value.contains(&b'\0') {
539            return Err(vm.new_value_error("embedded null byte"));
540        }
541        if key.is_empty() || key.contains(&b'=') {
542            return Err(vm.new_value_error("illegal environment variable name"));
543        }
544        let key = super::bytes_as_os_str(key, vm)?;
545        let value = super::bytes_as_os_str(value, vm)?;
546        // SAFETY: requirements forwarded from the caller
547        unsafe { env::set_var(key, value) };
548        Ok(())
549    }
550
551    #[cfg(windows)]
552    #[pyfunction]
553    fn unsetenv(key: PyStrRef, vm: &VirtualMachine) -> PyResult<()> {
554        let key_str = key.expect_str();
555        // Search from index 1 because on Windows starting '=' is allowed for
556        // defining hidden environment variables.
557        if key_str.is_empty()
558            || key_str.get(1..).is_some_and(|s| s.contains('='))
559            || key_str.contains('\0')
560        {
561            return Err(vm.new_value_error("illegal environment variable name"));
562        }
563        // "key=" to unset (empty value removes the variable)
564        let env_str = format!("{}=", key_str);
565        let wide = env_str.to_wide_with_nul();
566        check_env_var_len(wide.len(), vm)?;
567
568        // Use _wputenv like CPython (not SetEnvironmentVariableW) to update CRT environ
569        let result = unsafe { suppress_iph!(_wputenv(wide.as_ptr())) };
570        if result != 0 {
571            return Err(vm.new_last_errno_error());
572        }
573        Ok(())
574    }
575
576    #[cfg(not(windows))]
577    #[pyfunction]
578    fn unsetenv(
579        key: crate::function::Either<PyStrRef, PyBytesRef>,
580        vm: &VirtualMachine,
581    ) -> PyResult<()> {
582        let key = env_bytes_as_bytes(&key);
583        if key.contains(&b'\0') {
584            return Err(vm.new_value_error("embedded null byte"));
585        }
586        if key.is_empty() || key.contains(&b'=') {
587            let x = vm.new_errno_error(
588                22,
589                format!(
590                    "Invalid argument: {}",
591                    core::str::from_utf8(key).unwrap_or("<bytes encoding failure>")
592                ),
593            );
594
595            return Err(x.upcast());
596        }
597        let key = super::bytes_as_os_str(key, vm)?;
598        // SAFETY: requirements forwarded from the caller
599        unsafe { env::remove_var(key) };
600        Ok(())
601    }
602
603    #[pyfunction]
604    fn readlink(path: OsPath, dir_fd: DirFd<'_, 0>, vm: &VirtualMachine) -> PyResult {
605        let mode = path.mode();
606        let [] = dir_fd.0;
607        let path =
608            fs::read_link(&path).map_err(|err| OSErrorBuilder::with_filename(&err, path, vm))?;
609        Ok(mode.process_path(path, vm))
610    }
611
612    #[pyattr]
613    #[pyclass(name)]
614    #[derive(Debug, PyPayload)]
615    struct DirEntry {
616        file_name: std::ffi::OsString,
617        pathval: PathBuf,
618        file_type: io::Result<fs::FileType>,
619        /// dirent d_type value, used when file_type is unavailable (fd-based scandir)
620        #[cfg(unix)]
621        d_type: Option<u8>,
622        /// Parent directory fd for fd-based scandir, used for fstatat
623        #[cfg(not(any(windows, target_os = "redox")))]
624        dir_fd: Option<crt_fd::Raw>,
625        mode: OutputMode,
626        stat: OnceCell<PyObjectRef>,
627        lstat: OnceCell<PyObjectRef>,
628        #[cfg(unix)]
629        ino: AtomicCell<u64>,
630        #[cfg(windows)]
631        ino: AtomicCell<Option<u128>>,
632        #[cfg(not(any(unix, windows)))]
633        ino: AtomicCell<Option<u64>>,
634    }
635
636    #[pyclass(flags(DISALLOW_INSTANTIATION), with(Representable))]
637    impl DirEntry {
638        #[pygetset]
639        fn name(&self, vm: &VirtualMachine) -> PyResult {
640            Ok(self.mode.process_path(&self.file_name, vm))
641        }
642
643        #[pygetset]
644        fn path(&self, vm: &VirtualMachine) -> PyResult {
645            Ok(self.mode.process_path(&self.pathval, vm))
646        }
647
648        /// Build the DirFd to use for stat calls.
649        /// If this entry was produced by fd-based scandir, use the stored dir_fd
650        /// so that fstatat(dir_fd, name, ...) is used instead of stat(full_path).
651        fn stat_dir_fd(&self) -> DirFd<'_, { STAT_DIR_FD as usize }> {
652            #[cfg(not(any(windows, target_os = "redox")))]
653            if let Some(raw_fd) = self.dir_fd {
654                // Safety: the fd came from os.open() and is borrowed for
655                // the lifetime of this DirEntry reference.
656                let borrowed = unsafe { crt_fd::Borrowed::borrow_raw(raw_fd) };
657                return DirFd([borrowed; STAT_DIR_FD as usize]);
658            }
659            DirFd::default()
660        }
661
662        /// Stat-based mode test fallback. Uses fstatat when dir_fd is available.
663        #[cfg(unix)]
664        fn test_mode_via_stat(
665            &self,
666            follow_symlinks: bool,
667            mode_bits: u32,
668            vm: &VirtualMachine,
669        ) -> PyResult<bool> {
670            match self.stat(self.stat_dir_fd(), FollowSymlinks(follow_symlinks), vm) {
671                Ok(stat_obj) => {
672                    let st_mode: i32 = stat_obj.get_attr("st_mode", vm)?.try_into_value(vm)?;
673                    #[allow(clippy::unnecessary_cast)]
674                    Ok((st_mode as u32 & libc::S_IFMT as u32) == mode_bits)
675                }
676                Err(e) => {
677                    if e.fast_isinstance(vm.ctx.exceptions.file_not_found_error) {
678                        Ok(false)
679                    } else {
680                        Err(e)
681                    }
682                }
683            }
684        }
685
686        #[pymethod]
687        fn is_dir(&self, follow_symlinks: FollowSymlinks, vm: &VirtualMachine) -> PyResult<bool> {
688            if let Ok(file_type) = &self.file_type
689                && (!follow_symlinks.0 || !file_type.is_symlink())
690            {
691                return Ok(file_type.is_dir());
692            }
693            #[cfg(unix)]
694            if let Some(dt) = self.d_type {
695                let is_symlink = dt == libc::DT_LNK;
696                let need_stat = dt == libc::DT_UNKNOWN || (follow_symlinks.0 && is_symlink);
697                if !need_stat {
698                    return Ok(dt == libc::DT_DIR);
699                }
700            }
701            #[cfg(unix)]
702            return self.test_mode_via_stat(follow_symlinks.0, libc::S_IFDIR as _, vm);
703            #[cfg(not(unix))]
704            match super::fs_metadata(&self.pathval, follow_symlinks.0) {
705                Ok(meta) => Ok(meta.is_dir()),
706                Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
707                Err(e) => Err(e.into_pyexception(vm)),
708            }
709        }
710
711        #[pymethod]
712        fn is_file(&self, follow_symlinks: FollowSymlinks, vm: &VirtualMachine) -> PyResult<bool> {
713            if let Ok(file_type) = &self.file_type
714                && (!follow_symlinks.0 || !file_type.is_symlink())
715            {
716                return Ok(file_type.is_file());
717            }
718            #[cfg(unix)]
719            if let Some(dt) = self.d_type {
720                let is_symlink = dt == libc::DT_LNK;
721                let need_stat = dt == libc::DT_UNKNOWN || (follow_symlinks.0 && is_symlink);
722                if !need_stat {
723                    return Ok(dt == libc::DT_REG);
724                }
725            }
726            #[cfg(unix)]
727            return self.test_mode_via_stat(follow_symlinks.0, libc::S_IFREG as _, vm);
728            #[cfg(not(unix))]
729            match super::fs_metadata(&self.pathval, follow_symlinks.0) {
730                Ok(meta) => Ok(meta.is_file()),
731                Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
732                Err(e) => Err(e.into_pyexception(vm)),
733            }
734        }
735
736        #[pymethod]
737        fn is_symlink(&self, vm: &VirtualMachine) -> PyResult<bool> {
738            if let Ok(file_type) = &self.file_type {
739                return Ok(file_type.is_symlink());
740            }
741            #[cfg(unix)]
742            if let Some(dt) = self.d_type
743                && dt != libc::DT_UNKNOWN
744            {
745                return Ok(dt == libc::DT_LNK);
746            }
747            #[cfg(unix)]
748            return self.test_mode_via_stat(false, libc::S_IFLNK as _, vm);
749            #[cfg(not(unix))]
750            match &self.file_type {
751                Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
752                Err(e) => {
753                    use crate::convert::ToPyException;
754                    Err(e.to_pyexception(vm))
755                }
756                Ok(_) => Ok(false),
757            }
758        }
759
760        #[pymethod]
761        fn stat(
762            &self,
763            dir_fd: DirFd<'_, { STAT_DIR_FD as usize }>,
764            follow_symlinks: FollowSymlinks,
765            vm: &VirtualMachine,
766        ) -> PyResult {
767            // Use stored dir_fd if the caller didn't provide one
768            let effective_dir_fd = if dir_fd == DirFd::default() {
769                self.stat_dir_fd()
770            } else {
771                dir_fd
772            };
773            let do_stat = |follow_symlinks| {
774                stat(
775                    OsPath {
776                        path: self.pathval.as_os_str().to_owned(),
777                        origin: None,
778                    }
779                    .into(),
780                    effective_dir_fd,
781                    FollowSymlinks(follow_symlinks),
782                    vm,
783                )
784            };
785            let lstat = || match self.lstat.get() {
786                Some(val) => Ok(val),
787                None => {
788                    let val = do_stat(false)?;
789                    let _ = self.lstat.set(val);
790                    Ok(self.lstat.get().unwrap())
791                }
792            };
793            let stat = if follow_symlinks.0 {
794                match self.stat.get() {
795                    Some(val) => val,
796                    None => {
797                        let val = if self.is_symlink(vm)? {
798                            do_stat(true)?
799                        } else {
800                            lstat()?.clone()
801                        };
802                        let _ = self.stat.set(val);
803                        self.stat.get().unwrap()
804                    }
805                }
806            } else {
807                lstat()?
808            };
809            Ok(stat.clone())
810        }
811
812        #[cfg(windows)]
813        #[pymethod]
814        fn inode(&self, vm: &VirtualMachine) -> PyResult<u128> {
815            match self.ino.load() {
816                Some(ino) => Ok(ino),
817                None => {
818                    let stat = stat_inner(
819                        OsPath::new_str(self.pathval.as_os_str()).into(),
820                        DirFd::default(),
821                        FollowSymlinks(false),
822                    )
823                    .map_err(|e| e.into_pyexception(vm))?
824                    .ok_or_else(|| crate::exceptions::cstring_error(vm))?;
825                    // On Windows, combine st_ino and st_ino_high into 128-bit value
826                    #[cfg(windows)]
827                    let ino: u128 = stat.st_ino as u128 | ((stat.st_ino_high as u128) << 64);
828                    #[cfg(not(windows))]
829                    let ino: u128 = stat.st_ino as u128;
830                    // Err(T) means other thread set `ino` at the mean time which is safe to ignore
831                    let _ = self.ino.compare_exchange(None, Some(ino));
832                    Ok(ino)
833                }
834            }
835        }
836
837        #[cfg(unix)]
838        #[pymethod]
839        fn inode(&self, _vm: &VirtualMachine) -> PyResult<u64> {
840            Ok(self.ino.load())
841        }
842
843        #[cfg(not(any(unix, windows)))]
844        #[pymethod]
845        fn inode(&self, _vm: &VirtualMachine) -> PyResult<Option<u64>> {
846            Ok(self.ino.load())
847        }
848
849        #[cfg(not(windows))]
850        #[pymethod]
851        const fn is_junction(&self, _vm: &VirtualMachine) -> PyResult<bool> {
852            Ok(false)
853        }
854
855        #[cfg(windows)]
856        #[pymethod]
857        fn is_junction(&self, _vm: &VirtualMachine) -> PyResult<bool> {
858            Ok(junction::exists(self.pathval.clone()).unwrap_or(false))
859        }
860
861        #[pymethod]
862        fn __fspath__(&self, vm: &VirtualMachine) -> PyResult {
863            self.path(vm)
864        }
865
866        #[pyclassmethod]
867        fn __class_getitem__(
868            cls: PyTypeRef,
869            args: PyObjectRef,
870            vm: &VirtualMachine,
871        ) -> PyGenericAlias {
872            PyGenericAlias::from_args(cls, args, vm)
873        }
874
875        #[pymethod]
876        fn __reduce__(&self, vm: &VirtualMachine) -> PyResult {
877            Err(vm.new_type_error("cannot pickle 'DirEntry' object"))
878        }
879    }
880
881    impl Representable for DirEntry {
882        #[inline]
883        fn repr_wtf8(zelf: &Py<Self>, vm: &VirtualMachine) -> PyResult<Wtf8Buf> {
884            let name = match zelf.as_object().get_attr("name", vm) {
885                Ok(name) => Some(name),
886                Err(e)
887                    if e.fast_isinstance(vm.ctx.exceptions.attribute_error)
888                        || e.fast_isinstance(vm.ctx.exceptions.value_error) =>
889                {
890                    None
891                }
892                Err(e) => return Err(e),
893            };
894            if let Some(name) = name {
895                if let Some(_guard) = ReprGuard::enter(vm, zelf.as_object()) {
896                    let repr = name.repr(vm)?;
897                    let mut result = Wtf8Buf::from(format!("<{} ", zelf.class()));
898                    result.push_wtf8(repr.as_wtf8());
899                    result.push_char('>');
900                    Ok(result)
901                } else {
902                    Err(vm.new_runtime_error(format!(
903                        "reentrant call inside {}.__repr__",
904                        zelf.class()
905                    )))
906                }
907            } else {
908                Ok(Wtf8Buf::from(format!("<{}>", zelf.class())))
909            }
910        }
911    }
912    #[pyattr]
913    #[pyclass(name = "ScandirIter")]
914    #[derive(Debug, PyPayload)]
915    struct ScandirIterator {
916        entries: PyRwLock<Option<fs::ReadDir>>,
917        mode: OutputMode,
918    }
919
920    #[pyclass(flags(DISALLOW_INSTANTIATION), with(Destructor, IterNext, Iterable))]
921    impl ScandirIterator {
922        #[pymethod]
923        fn close(&self) {
924            let entryref: &mut Option<fs::ReadDir> = &mut self.entries.write();
925            let _dropped = entryref.take();
926        }
927
928        #[pymethod]
929        const fn __enter__(zelf: PyRef<Self>) -> PyRef<Self> {
930            zelf
931        }
932
933        #[pymethod]
934        fn __exit__(zelf: PyRef<Self>, _args: FuncArgs) {
935            zelf.close()
936        }
937
938        #[pymethod]
939        fn __reduce__(&self, vm: &VirtualMachine) -> PyResult {
940            Err(vm.new_type_error("cannot pickle 'ScandirIterator' object"))
941        }
942    }
943    impl Destructor for ScandirIterator {
944        fn del(zelf: &Py<Self>, vm: &VirtualMachine) -> PyResult<()> {
945            // Emit ResourceWarning if the iterator is not yet exhausted/closed
946            if zelf.entries.read().is_some() {
947                let _ = crate::stdlib::_warnings::warn(
948                    vm.ctx.exceptions.resource_warning,
949                    format!("unclosed scandir iterator {:?}", zelf.as_object()),
950                    1,
951                    vm,
952                );
953                zelf.close();
954            }
955            Ok(())
956        }
957    }
958    impl SelfIter for ScandirIterator {}
959    impl IterNext for ScandirIterator {
960        fn next(zelf: &crate::Py<Self>, vm: &VirtualMachine) -> PyResult<PyIterReturn> {
961            let entryref: &mut Option<fs::ReadDir> = &mut zelf.entries.write();
962
963            match entryref {
964                None => Ok(PyIterReturn::StopIteration(None)),
965                Some(inner) => match inner.next() {
966                    Some(entry) => match entry {
967                        Ok(entry) => {
968                            #[cfg(unix)]
969                            let ino = {
970                                use std::os::unix::fs::DirEntryExt;
971                                entry.ino()
972                            };
973                            // TODO: wasi is nightly
974                            // #[cfg(target_os = "wasi")]
975                            // let ino = {
976                            //     use std::os::wasi::fs::DirEntryExt;
977                            //     entry.ino()
978                            // };
979                            #[cfg(not(unix))]
980                            let ino = None;
981
982                            let pathval = entry.path();
983
984                            // On Windows, pre-cache lstat from directory entry metadata
985                            // This allows stat() to return cached data even if file is removed
986                            #[cfg(windows)]
987                            let lstat = {
988                                let cell = OnceCell::new();
989                                if let Ok(stat_struct) =
990                                    crate::windows::win32_xstat(pathval.as_os_str(), false)
991                                {
992                                    let stat_obj =
993                                        StatResultData::from_stat(&stat_struct, vm).to_pyobject(vm);
994                                    let _ = cell.set(stat_obj);
995                                }
996                                cell
997                            };
998                            #[cfg(not(windows))]
999                            let lstat = OnceCell::new();
1000
1001                            Ok(PyIterReturn::Return(
1002                                DirEntry {
1003                                    file_name: entry.file_name(),
1004                                    pathval,
1005                                    file_type: entry.file_type(),
1006                                    #[cfg(unix)]
1007                                    d_type: None,
1008                                    #[cfg(not(any(windows, target_os = "redox")))]
1009                                    dir_fd: None,
1010                                    mode: zelf.mode,
1011                                    lstat,
1012                                    stat: OnceCell::new(),
1013                                    ino: AtomicCell::new(ino),
1014                                }
1015                                .into_ref(&vm.ctx)
1016                                .into(),
1017                            ))
1018                        }
1019                        Err(err) => Err(err.into_pyexception(vm)),
1020                    },
1021                    None => {
1022                        let _dropped = entryref.take();
1023                        Ok(PyIterReturn::StopIteration(None))
1024                    }
1025                },
1026            }
1027        }
1028    }
1029
1030    /// Wrapper around a raw `libc::DIR*` for fd-based scandir.
1031    #[cfg(all(unix, not(target_os = "redox")))]
1032    struct OwnedDir(core::ptr::NonNull<libc::DIR>);
1033
1034    #[cfg(all(unix, not(target_os = "redox")))]
1035    impl OwnedDir {
1036        fn from_fd(fd: crt_fd::Raw) -> io::Result<Self> {
1037            let ptr = unsafe { libc::fdopendir(fd) };
1038            core::ptr::NonNull::new(ptr)
1039                .map(OwnedDir)
1040                .ok_or_else(io::Error::last_os_error)
1041        }
1042
1043        fn as_ptr(&self) -> *mut libc::DIR {
1044            self.0.as_ptr()
1045        }
1046    }
1047
1048    #[cfg(all(unix, not(target_os = "redox")))]
1049    impl Drop for OwnedDir {
1050        fn drop(&mut self) {
1051            unsafe {
1052                libc::rewinddir(self.0.as_ptr());
1053                libc::closedir(self.0.as_ptr());
1054            }
1055        }
1056    }
1057
1058    #[cfg(all(unix, not(target_os = "redox")))]
1059    impl core::fmt::Debug for OwnedDir {
1060        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
1061            f.debug_tuple("OwnedDir").field(&self.0).finish()
1062        }
1063    }
1064
1065    // Safety: OwnedDir wraps a *mut libc::DIR. All access is synchronized
1066    // through the PyMutex in ScandirIteratorFd.
1067    #[cfg(all(unix, not(target_os = "redox")))]
1068    unsafe impl Send for OwnedDir {}
1069    #[cfg(all(unix, not(target_os = "redox")))]
1070    unsafe impl Sync for OwnedDir {}
1071
1072    #[cfg(all(unix, not(target_os = "redox")))]
1073    #[pyattr]
1074    #[pyclass(name = "ScandirIter")]
1075    #[derive(Debug, PyPayload)]
1076    struct ScandirIteratorFd {
1077        dir: crate::common::lock::PyMutex<Option<OwnedDir>>,
1078        /// The original fd passed to scandir(), stored in DirEntry for fstatat
1079        orig_fd: crt_fd::Raw,
1080    }
1081
1082    #[cfg(all(unix, not(target_os = "redox")))]
1083    #[pyclass(flags(DISALLOW_INSTANTIATION), with(Destructor, IterNext, Iterable))]
1084    impl ScandirIteratorFd {
1085        #[pymethod]
1086        fn close(&self) {
1087            let _dropped = self.dir.lock().take();
1088        }
1089
1090        #[pymethod]
1091        const fn __enter__(zelf: PyRef<Self>) -> PyRef<Self> {
1092            zelf
1093        }
1094
1095        #[pymethod]
1096        fn __exit__(zelf: PyRef<Self>, _args: FuncArgs) {
1097            zelf.close()
1098        }
1099
1100        #[pymethod]
1101        fn __reduce__(&self, vm: &VirtualMachine) -> PyResult {
1102            Err(vm.new_type_error("cannot pickle 'ScandirIterator' object"))
1103        }
1104    }
1105
1106    #[cfg(all(unix, not(target_os = "redox")))]
1107    impl Destructor for ScandirIteratorFd {
1108        fn del(zelf: &Py<Self>, vm: &VirtualMachine) -> PyResult<()> {
1109            if zelf.dir.lock().is_some() {
1110                let _ = crate::stdlib::_warnings::warn(
1111                    vm.ctx.exceptions.resource_warning,
1112                    format!("unclosed scandir iterator {:?}", zelf.as_object()),
1113                    1,
1114                    vm,
1115                );
1116                zelf.close();
1117            }
1118            Ok(())
1119        }
1120    }
1121
1122    #[cfg(all(unix, not(target_os = "redox")))]
1123    impl SelfIter for ScandirIteratorFd {}
1124
1125    #[cfg(all(unix, not(target_os = "redox")))]
1126    impl IterNext for ScandirIteratorFd {
1127        fn next(zelf: &crate::Py<Self>, vm: &VirtualMachine) -> PyResult<PyIterReturn> {
1128            use rustpython_common::os::ffi::OsStrExt;
1129            let mut guard = zelf.dir.lock();
1130            let dir = match guard.as_mut() {
1131                None => return Ok(PyIterReturn::StopIteration(None)),
1132                Some(dir) => dir,
1133            };
1134            loop {
1135                nix::errno::Errno::clear();
1136                let entry = unsafe {
1137                    let ptr = libc::readdir(dir.as_ptr());
1138                    if ptr.is_null() {
1139                        let err = nix::errno::Errno::last();
1140                        if err != nix::errno::Errno::UnknownErrno {
1141                            return Err(io::Error::from(err).into_pyexception(vm));
1142                        }
1143                        drop(guard.take());
1144                        return Ok(PyIterReturn::StopIteration(None));
1145                    }
1146                    &*ptr
1147                };
1148                let fname = unsafe { core::ffi::CStr::from_ptr(entry.d_name.as_ptr()) }.to_bytes();
1149                if fname == b"." || fname == b".." {
1150                    continue;
1151                }
1152                let file_name = std::ffi::OsString::from(std::ffi::OsStr::from_bytes(fname));
1153                let pathval = PathBuf::from(&file_name);
1154                #[cfg(target_os = "freebsd")]
1155                let ino = entry.d_fileno;
1156                #[cfg(not(target_os = "freebsd"))]
1157                let ino = entry.d_ino;
1158                let d_type = entry.d_type;
1159                return Ok(PyIterReturn::Return(
1160                    DirEntry {
1161                        file_name,
1162                        pathval,
1163                        file_type: Err(io::Error::other(
1164                            "file_type unavailable for fd-based scandir",
1165                        )),
1166                        d_type: if d_type == libc::DT_UNKNOWN {
1167                            None
1168                        } else {
1169                            Some(d_type)
1170                        },
1171                        dir_fd: Some(zelf.orig_fd),
1172                        mode: OutputMode::String,
1173                        lstat: OnceCell::new(),
1174                        stat: OnceCell::new(),
1175                        ino: AtomicCell::new(ino as _),
1176                    }
1177                    .into_ref(&vm.ctx)
1178                    .into(),
1179                ));
1180            }
1181        }
1182    }
1183
1184    #[pyfunction]
1185    fn scandir(path: OptionalArg<Option<OsPathOrFd<'_>>>, vm: &VirtualMachine) -> PyResult {
1186        let path = path
1187            .flatten()
1188            .unwrap_or_else(|| OsPathOrFd::Path(OsPath::new_str(".")));
1189        match path {
1190            OsPathOrFd::Path(path) => {
1191                let entries = fs::read_dir(&path.path)
1192                    .map_err(|err| OSErrorBuilder::with_filename(&err, path.clone(), vm))?;
1193                Ok(ScandirIterator {
1194                    entries: PyRwLock::new(Some(entries)),
1195                    mode: path.mode(),
1196                }
1197                .into_ref(&vm.ctx)
1198                .into())
1199            }
1200            OsPathOrFd::Fd(fno) => {
1201                #[cfg(not(all(unix, not(target_os = "redox"))))]
1202                {
1203                    let _ = fno;
1204                    Err(vm.new_not_implemented_error("can't pass fd to scandir on this platform"))
1205                }
1206                #[cfg(all(unix, not(target_os = "redox")))]
1207                {
1208                    use std::os::unix::io::IntoRawFd;
1209                    // closedir() closes the fd, so duplicate it first
1210                    let new_fd = nix::unistd::dup(fno).map_err(|e| e.into_pyexception(vm))?;
1211                    let raw_fd = new_fd.into_raw_fd();
1212                    let dir = OwnedDir::from_fd(raw_fd).map_err(|e| {
1213                        // fdopendir failed, close the dup'd fd
1214                        unsafe { libc::close(raw_fd) };
1215                        e.into_pyexception(vm)
1216                    })?;
1217                    Ok(ScandirIteratorFd {
1218                        dir: crate::common::lock::PyMutex::new(Some(dir)),
1219                        orig_fd: fno.as_raw(),
1220                    }
1221                    .into_ref(&vm.ctx)
1222                    .into())
1223                }
1224            }
1225        }
1226    }
1227
1228    #[derive(Debug, FromArgs)]
1229    #[pystruct_sequence_data]
1230    struct StatResultData {
1231        pub st_mode: PyIntRef,
1232        pub st_ino: PyIntRef,
1233        pub st_dev: PyIntRef,
1234        pub st_nlink: PyIntRef,
1235        pub st_uid: PyIntRef,
1236        pub st_gid: PyIntRef,
1237        pub st_size: PyIntRef,
1238        // Indices 7-9: integer seconds
1239        #[cfg_attr(target_env = "musl", allow(deprecated))]
1240        #[pyarg(positional, default)]
1241        #[pystruct_sequence(unnamed)]
1242        pub st_atime_int: libc::time_t,
1243        #[cfg_attr(target_env = "musl", allow(deprecated))]
1244        #[pyarg(positional, default)]
1245        #[pystruct_sequence(unnamed)]
1246        pub st_mtime_int: libc::time_t,
1247        #[cfg_attr(target_env = "musl", allow(deprecated))]
1248        #[pyarg(positional, default)]
1249        #[pystruct_sequence(unnamed)]
1250        pub st_ctime_int: libc::time_t,
1251        // Float time attributes
1252        #[pyarg(any, default)]
1253        #[pystruct_sequence(skip)]
1254        pub st_atime: f64,
1255        #[pyarg(any, default)]
1256        #[pystruct_sequence(skip)]
1257        pub st_mtime: f64,
1258        #[pyarg(any, default)]
1259        #[pystruct_sequence(skip)]
1260        pub st_ctime: f64,
1261        // Nanosecond attributes
1262        #[pyarg(any, default)]
1263        #[pystruct_sequence(skip)]
1264        pub st_atime_ns: i128,
1265        #[pyarg(any, default)]
1266        #[pystruct_sequence(skip)]
1267        pub st_mtime_ns: i128,
1268        #[pyarg(any, default)]
1269        #[pystruct_sequence(skip)]
1270        pub st_ctime_ns: i128,
1271        // Unix-specific attributes
1272        #[cfg(not(windows))]
1273        #[pyarg(any, default)]
1274        #[pystruct_sequence(skip)]
1275        pub st_blksize: i64,
1276        #[cfg(not(windows))]
1277        #[pyarg(any, default)]
1278        #[pystruct_sequence(skip)]
1279        pub st_blocks: i64,
1280        #[cfg(windows)]
1281        #[pyarg(any, default)]
1282        #[pystruct_sequence(skip)]
1283        pub st_reparse_tag: u32,
1284        #[cfg(windows)]
1285        #[pyarg(any, default)]
1286        #[pystruct_sequence(skip)]
1287        pub st_file_attributes: u32,
1288    }
1289
1290    impl StatResultData {
1291        fn from_stat(stat: &StatStruct, vm: &VirtualMachine) -> Self {
1292            let (atime, mtime, ctime);
1293            #[cfg(any(unix, windows))]
1294            #[cfg(not(any(target_os = "netbsd", target_os = "wasi")))]
1295            {
1296                atime = (stat.st_atime, stat.st_atime_nsec);
1297                mtime = (stat.st_mtime, stat.st_mtime_nsec);
1298                ctime = (stat.st_ctime, stat.st_ctime_nsec);
1299            }
1300            #[cfg(target_os = "netbsd")]
1301            {
1302                atime = (stat.st_atime, stat.st_atimensec);
1303                mtime = (stat.st_mtime, stat.st_mtimensec);
1304                ctime = (stat.st_ctime, stat.st_ctimensec);
1305            }
1306            #[cfg(target_os = "wasi")]
1307            {
1308                atime = (stat.st_atim.tv_sec, stat.st_atim.tv_nsec);
1309                mtime = (stat.st_mtim.tv_sec, stat.st_mtim.tv_nsec);
1310                ctime = (stat.st_ctim.tv_sec, stat.st_ctim.tv_nsec);
1311            }
1312
1313            const NANOS_PER_SEC: u32 = 1_000_000_000;
1314            let to_f64 = |(s, ns)| (s as f64) + (ns as f64) / (NANOS_PER_SEC as f64);
1315            let to_ns = |(s, ns)| s as i128 * NANOS_PER_SEC as i128 + ns as i128;
1316
1317            #[cfg(windows)]
1318            let st_reparse_tag = stat.st_reparse_tag;
1319            #[cfg(windows)]
1320            let st_file_attributes = stat.st_file_attributes;
1321
1322            // On Windows, combine st_ino and st_ino_high into a 128-bit value
1323            // like _pystat_l128_from_l64_l64
1324            #[cfg(windows)]
1325            let st_ino: u128 = stat.st_ino as u128 | ((stat.st_ino_high as u128) << 64);
1326            #[cfg(not(windows))]
1327            let st_ino = stat.st_ino;
1328
1329            #[cfg(not(windows))]
1330            #[allow(clippy::useless_conversion, reason = "needed for 32-bit platforms")]
1331            let st_blksize = i64::from(stat.st_blksize);
1332            #[cfg(not(windows))]
1333            #[allow(clippy::useless_conversion, reason = "needed for 32-bit platforms")]
1334            let st_blocks = i64::from(stat.st_blocks);
1335
1336            Self {
1337                st_mode: vm.ctx.new_pyref(stat.st_mode),
1338                st_ino: vm.ctx.new_pyref(st_ino),
1339                st_dev: vm.ctx.new_pyref(stat.st_dev),
1340                st_nlink: vm.ctx.new_pyref(stat.st_nlink),
1341                st_uid: vm.ctx.new_pyref(stat.st_uid),
1342                st_gid: vm.ctx.new_pyref(stat.st_gid),
1343                st_size: vm.ctx.new_pyref(stat.st_size),
1344                st_atime_int: atime.0,
1345                st_mtime_int: mtime.0,
1346                st_ctime_int: ctime.0,
1347                st_atime: to_f64(atime),
1348                st_mtime: to_f64(mtime),
1349                st_ctime: to_f64(ctime),
1350                st_atime_ns: to_ns(atime),
1351                st_mtime_ns: to_ns(mtime),
1352                st_ctime_ns: to_ns(ctime),
1353                #[cfg(not(windows))]
1354                st_blksize,
1355                #[cfg(not(windows))]
1356                st_blocks,
1357                #[cfg(windows)]
1358                st_reparse_tag,
1359                #[cfg(windows)]
1360                st_file_attributes,
1361            }
1362        }
1363    }
1364
1365    #[pyattr]
1366    #[pystruct_sequence(name = "stat_result", module = "os", data = "StatResultData")]
1367    struct PyStatResult;
1368
1369    #[pyclass(with(PyStructSequence))]
1370    impl PyStatResult {
1371        #[pyslot]
1372        fn slot_new(cls: PyTypeRef, args: FuncArgs, vm: &VirtualMachine) -> PyResult {
1373            let seq: PyObjectRef = args.bind(vm)?;
1374            let result = crate::types::struct_sequence_new(cls.clone(), seq, vm)?;
1375            let tuple = result.downcast_ref::<PyTuple>().unwrap();
1376            let mut items: Vec<PyObjectRef> = tuple.to_vec();
1377
1378            // Copy integer time fields to hidden float timestamp slots when not provided.
1379            // indices 7-9: st_atime_int, st_mtime_int, st_ctime_int
1380            // i+3: st_atime/st_mtime/st_ctime (float timestamps, copied from int if missing)
1381            // i+6: st_atime_ns/st_mtime_ns/st_ctime_ns (left as None if not provided)
1382            for i in 7..=9 {
1383                if vm.is_none(&items[i + 3]) {
1384                    items[i + 3] = items[i].clone();
1385                }
1386            }
1387
1388            PyTuple::new_unchecked(items.into_boxed_slice())
1389                .into_ref_with_type(vm, cls)
1390                .map(Into::into)
1391        }
1392    }
1393
1394    #[cfg(windows)]
1395    fn stat_inner(
1396        file: OsPathOrFd<'_>,
1397        dir_fd: DirFd<'_, { STAT_DIR_FD as usize }>,
1398        follow_symlinks: FollowSymlinks,
1399    ) -> io::Result<Option<StatStruct>> {
1400        // TODO: replicate CPython's win32_xstat
1401        let [] = dir_fd.0;
1402        match file {
1403            OsPathOrFd::Path(path) => crate::windows::win32_xstat(&path.path, follow_symlinks.0),
1404            OsPathOrFd::Fd(fd) => crate::common::fileutils::fstat(fd),
1405        }
1406        .map(Some)
1407    }
1408
1409    #[cfg(not(windows))]
1410    fn stat_inner(
1411        file: OsPathOrFd<'_>,
1412        dir_fd: DirFd<'_, { STAT_DIR_FD as usize }>,
1413        follow_symlinks: FollowSymlinks,
1414    ) -> io::Result<Option<StatStruct>> {
1415        let mut stat = core::mem::MaybeUninit::uninit();
1416        let ret = match file {
1417            OsPathOrFd::Path(path) => {
1418                use rustpython_common::os::ffi::OsStrExt;
1419                let path = path.as_ref().as_os_str().as_bytes();
1420                let path = match alloc::ffi::CString::new(path) {
1421                    Ok(x) => x,
1422                    Err(_) => return Ok(None),
1423                };
1424
1425                #[cfg(not(target_os = "redox"))]
1426                let fstatat_ret = dir_fd.raw_opt().map(|dir_fd| {
1427                    let flags = if follow_symlinks.0 {
1428                        0
1429                    } else {
1430                        libc::AT_SYMLINK_NOFOLLOW
1431                    };
1432                    unsafe { libc::fstatat(dir_fd, path.as_ptr(), stat.as_mut_ptr(), flags) }
1433                });
1434                #[cfg(target_os = "redox")]
1435                let ([], fstatat_ret) = (dir_fd.0, None);
1436
1437                fstatat_ret.unwrap_or_else(|| {
1438                    if follow_symlinks.0 {
1439                        unsafe { libc::stat(path.as_ptr(), stat.as_mut_ptr()) }
1440                    } else {
1441                        unsafe { libc::lstat(path.as_ptr(), stat.as_mut_ptr()) }
1442                    }
1443                })
1444            }
1445            OsPathOrFd::Fd(fd) => unsafe { libc::fstat(fd.as_raw(), stat.as_mut_ptr()) },
1446        };
1447        if ret < 0 {
1448            return Err(io::Error::last_os_error());
1449        }
1450        Ok(Some(unsafe { stat.assume_init() }))
1451    }
1452
1453    #[pyfunction]
1454    #[pyfunction(name = "fstat")]
1455    fn stat(
1456        file: OsPathOrFd<'_>,
1457        dir_fd: DirFd<'_, { STAT_DIR_FD as usize }>,
1458        follow_symlinks: FollowSymlinks,
1459        vm: &VirtualMachine,
1460    ) -> PyResult {
1461        let stat = stat_inner(file.clone(), dir_fd, follow_symlinks)
1462            .map_err(|err| OSErrorBuilder::with_filename(&err, file, vm))?
1463            .ok_or_else(|| crate::exceptions::cstring_error(vm))?;
1464        Ok(StatResultData::from_stat(&stat, vm).to_pyobject(vm))
1465    }
1466
1467    #[pyfunction]
1468    fn lstat(
1469        file: OsPath,
1470        dir_fd: DirFd<'_, { STAT_DIR_FD as usize }>,
1471        vm: &VirtualMachine,
1472    ) -> PyResult {
1473        stat(file.into(), dir_fd, FollowSymlinks(false), vm)
1474    }
1475
1476    fn curdir_inner(vm: &VirtualMachine) -> PyResult<PathBuf> {
1477        env::current_dir().map_err(|err| err.into_pyexception(vm))
1478    }
1479
1480    #[pyfunction]
1481    fn getcwd(vm: &VirtualMachine) -> PyResult {
1482        Ok(OutputMode::String.process_path(curdir_inner(vm)?, vm))
1483    }
1484
1485    #[pyfunction]
1486    fn getcwdb(vm: &VirtualMachine) -> PyResult {
1487        Ok(OutputMode::Bytes.process_path(curdir_inner(vm)?, vm))
1488    }
1489
1490    #[pyfunction]
1491    fn chdir(path: OsPath, vm: &VirtualMachine) -> PyResult<()> {
1492        env::set_current_dir(&path.path)
1493            .map_err(|err| OSErrorBuilder::with_filename(&err, path, vm))?;
1494
1495        #[cfg(windows)]
1496        {
1497            // win32_wchdir()
1498
1499            // On Windows, set the per-drive CWD environment variable (=X:)
1500            // This is required for GetFullPathNameW to work correctly with drive-relative paths
1501
1502            use std::os::windows::ffi::OsStrExt;
1503            use windows_sys::Win32::System::Environment::SetEnvironmentVariableW;
1504
1505            if let Ok(cwd) = env::current_dir() {
1506                let cwd_str = cwd.as_os_str();
1507                let mut cwd_wide: Vec<u16> = cwd_str.encode_wide().collect();
1508
1509                // Check for UNC-like paths (\\server\share or //server/share)
1510                // wcsncmp(new_path, L"\\\\", 2) == 0 || wcsncmp(new_path, L"//", 2) == 0
1511                let is_unc_like_path = cwd_wide.len() >= 2
1512                    && ((cwd_wide[0] == b'\\' as u16 && cwd_wide[1] == b'\\' as u16)
1513                        || (cwd_wide[0] == b'/' as u16 && cwd_wide[1] == b'/' as u16));
1514
1515                if !is_unc_like_path {
1516                    // Create env var name "=X:" where X is the drive letter
1517                    let env_name: [u16; 4] = [b'=' as u16, cwd_wide[0], b':' as u16, 0];
1518                    cwd_wide.push(0); // null-terminate the path
1519                    unsafe {
1520                        SetEnvironmentVariableW(env_name.as_ptr(), cwd_wide.as_ptr());
1521                    }
1522                }
1523            }
1524        }
1525
1526        Ok(())
1527    }
1528
1529    #[pyfunction]
1530    fn fspath(path: PyObjectRef, vm: &VirtualMachine) -> PyResult<FsPath> {
1531        FsPath::try_from_path_like(path, false, vm)
1532    }
1533
1534    #[pyfunction]
1535    #[pyfunction(name = "replace")]
1536    fn rename(src: PyObjectRef, dst: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
1537        let src = PathConverter::new()
1538            .function("rename")
1539            .argument("src")
1540            .try_path(src, vm)?;
1541        let dst = PathConverter::new()
1542            .function("rename")
1543            .argument("dst")
1544            .try_path(dst, vm)?;
1545
1546        fs::rename(&src.path, &dst.path).map_err(|err| {
1547            let builder = err.to_os_error_builder(vm);
1548            let builder = builder.filename(src.filename(vm));
1549            let builder = builder.filename2(dst.filename(vm));
1550            builder.build(vm).upcast()
1551        })
1552    }
1553
1554    #[pyfunction]
1555    fn getpid(vm: &VirtualMachine) -> PyObjectRef {
1556        let pid = if cfg!(target_arch = "wasm32") {
1557            // Return an arbitrary value, greater than 1 which is special.
1558            // The value 42 is picked from wasi-libc
1559            // https://github.com/WebAssembly/wasi-libc/blob/wasi-sdk-21/libc-bottom-half/getpid/getpid.c
1560            42
1561        } else {
1562            std::process::id()
1563        };
1564        vm.ctx.new_int(pid).into()
1565    }
1566
1567    #[pyfunction]
1568    fn cpu_count(vm: &VirtualMachine) -> PyObjectRef {
1569        let cpu_count = num_cpus::get();
1570        vm.ctx.new_int(cpu_count).into()
1571    }
1572
1573    #[pyfunction]
1574    fn _exit(code: i32) {
1575        std::process::exit(code)
1576    }
1577
1578    #[pyfunction]
1579    fn abort() {
1580        unsafe extern "C" {
1581            fn abort();
1582        }
1583        unsafe { abort() }
1584    }
1585
1586    #[pyfunction]
1587    fn urandom(size: isize, vm: &VirtualMachine) -> PyResult<Vec<u8>> {
1588        if size < 0 {
1589            return Err(vm.new_value_error("negative argument not allowed"));
1590        }
1591        let mut buf = vec![0u8; size as usize];
1592        getrandom::fill(&mut buf).map_err(|e| io::Error::from(e).into_pyexception(vm))?;
1593        Ok(buf)
1594    }
1595
1596    #[pyfunction]
1597    pub fn isatty(fd: i32) -> bool {
1598        unsafe { suppress_iph!(libc::isatty(fd)) != 0 }
1599    }
1600
1601    #[pyfunction]
1602    pub fn lseek(
1603        fd: crt_fd::Borrowed<'_>,
1604        position: crt_fd::Offset,
1605        how: i32,
1606        vm: &VirtualMachine,
1607    ) -> PyResult<crt_fd::Offset> {
1608        #[cfg(not(windows))]
1609        let res = unsafe { suppress_iph!(libc::lseek(fd.as_raw(), position, how)) };
1610        #[cfg(windows)]
1611        let res = unsafe {
1612            use std::os::windows::io::AsRawHandle;
1613            use windows_sys::Win32::Storage::FileSystem;
1614            let handle = crt_fd::as_handle(fd).map_err(|e| e.into_pyexception(vm))?;
1615            let mut distance_to_move: [i32; 2] = core::mem::transmute(position);
1616            let ret = FileSystem::SetFilePointer(
1617                handle.as_raw_handle(),
1618                distance_to_move[0],
1619                &mut distance_to_move[1],
1620                how as _,
1621            );
1622            if ret == FileSystem::INVALID_SET_FILE_POINTER {
1623                -1
1624            } else {
1625                distance_to_move[0] = ret as _;
1626                core::mem::transmute::<[i32; 2], i64>(distance_to_move)
1627            }
1628        };
1629        if res < 0 {
1630            Err(vm.new_last_os_error())
1631        } else {
1632            Ok(res)
1633        }
1634    }
1635
1636    #[derive(FromArgs)]
1637    struct LinkArgs {
1638        #[pyarg(any)]
1639        src: OsPath,
1640        #[pyarg(any)]
1641        dst: OsPath,
1642        #[pyarg(named, name = "follow_symlinks", optional)]
1643        follow_symlinks: OptionalArg<bool>,
1644    }
1645
1646    #[pyfunction]
1647    fn link(args: LinkArgs, vm: &VirtualMachine) -> PyResult<()> {
1648        let LinkArgs {
1649            src,
1650            dst,
1651            follow_symlinks,
1652        } = args;
1653
1654        #[cfg(unix)]
1655        {
1656            use std::os::unix::ffi::OsStrExt;
1657            let src_cstr = alloc::ffi::CString::new(src.path.as_os_str().as_bytes())
1658                .map_err(|_| vm.new_value_error("embedded null byte"))?;
1659            let dst_cstr = alloc::ffi::CString::new(dst.path.as_os_str().as_bytes())
1660                .map_err(|_| vm.new_value_error("embedded null byte"))?;
1661
1662            let follow = follow_symlinks.into_option().unwrap_or(true);
1663            let flags = if follow { libc::AT_SYMLINK_FOLLOW } else { 0 };
1664
1665            let ret = unsafe {
1666                libc::linkat(
1667                    libc::AT_FDCWD,
1668                    src_cstr.as_ptr(),
1669                    libc::AT_FDCWD,
1670                    dst_cstr.as_ptr(),
1671                    flags,
1672                )
1673            };
1674
1675            if ret != 0 {
1676                let err = std::io::Error::last_os_error();
1677                let builder = err.to_os_error_builder(vm);
1678                let builder = builder.filename(src.filename(vm));
1679                let builder = builder.filename2(dst.filename(vm));
1680                return Err(builder.build(vm).upcast());
1681            }
1682
1683            Ok(())
1684        }
1685
1686        #[cfg(not(unix))]
1687        {
1688            let src_path = match follow_symlinks.into_option() {
1689                Some(true) => {
1690                    // Explicit follow_symlinks=True: resolve symlinks
1691                    fs::canonicalize(&src.path).unwrap_or_else(|_| PathBuf::from(src.path.clone()))
1692                }
1693                Some(false) | None => {
1694                    // Default or explicit no-follow: native hard_link behavior
1695                    PathBuf::from(src.path.clone())
1696                }
1697            };
1698
1699            fs::hard_link(&src_path, &dst.path).map_err(|err| {
1700                let builder = err.to_os_error_builder(vm);
1701                let builder = builder.filename(src.filename(vm));
1702                let builder = builder.filename2(dst.filename(vm));
1703                builder.build(vm).upcast()
1704            })
1705        }
1706    }
1707
1708    #[cfg(any(unix, windows))]
1709    #[pyfunction]
1710    fn system(command: PyStrRef, vm: &VirtualMachine) -> PyResult<i32> {
1711        let cstr = command.to_cstring(vm)?;
1712        let x = unsafe { libc::system(cstr.as_ptr()) };
1713        Ok(x)
1714    }
1715
1716    #[derive(FromArgs)]
1717    struct UtimeArgs<'fd> {
1718        path: OsPath,
1719        #[pyarg(any, default)]
1720        times: Option<PyTupleRef>,
1721        #[pyarg(named, default)]
1722        ns: Option<PyTupleRef>,
1723        #[pyarg(flatten)]
1724        dir_fd: DirFd<'fd, { UTIME_DIR_FD as usize }>,
1725        #[pyarg(flatten)]
1726        follow_symlinks: FollowSymlinks,
1727    }
1728
1729    #[pyfunction]
1730    fn utime(args: UtimeArgs<'_>, vm: &VirtualMachine) -> PyResult<()> {
1731        let parse_tup = |tup: &Py<PyTuple>| -> Option<(PyObjectRef, PyObjectRef)> {
1732            if tup.len() != 2 {
1733                None
1734            } else {
1735                Some((tup[0].clone(), tup[1].clone()))
1736            }
1737        };
1738        let (acc, modif) = match (args.times, args.ns) {
1739            (Some(t), None) => {
1740                let (a, m) = parse_tup(&t).ok_or_else(|| {
1741                    vm.new_type_error("utime: 'times' must be either a tuple of two ints or None")
1742                })?;
1743                (a.try_into_value(vm)?, m.try_into_value(vm)?)
1744            }
1745            (None, Some(ns)) => {
1746                let (a, m) = parse_tup(&ns)
1747                    .ok_or_else(|| vm.new_type_error("utime: 'ns' must be a tuple of two ints"))?;
1748                let ns_in_sec: PyObjectRef = vm.ctx.new_int(1_000_000_000).into();
1749                let ns_to_dur = |obj: PyObjectRef| {
1750                    let divmod = vm._divmod(&obj, &ns_in_sec)?;
1751                    let (div, rem) = divmod
1752                        .downcast_ref::<PyTuple>()
1753                        .and_then(parse_tup)
1754                        .ok_or_else(|| {
1755                            vm.new_type_error(format!(
1756                                "{}.__divmod__() must return a 2-tuple, not {}",
1757                                obj.class().name(),
1758                                divmod.class().name()
1759                            ))
1760                        })?;
1761                    let secs = div.try_index(vm)?.try_to_primitive(vm)?;
1762                    let ns = rem.try_index(vm)?.try_to_primitive(vm)?;
1763                    Ok(Duration::new(secs, ns))
1764                };
1765                // TODO: do validation to make sure this doesn't.. underflow?
1766                (ns_to_dur(a)?, ns_to_dur(m)?)
1767            }
1768            (None, None) => {
1769                let now = SystemTime::now();
1770                let now = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
1771                (now, now)
1772            }
1773            (Some(_), Some(_)) => {
1774                return Err(vm.new_value_error(
1775                    "utime: you may specify either 'times' or 'ns' but not both",
1776                ));
1777            }
1778        };
1779        utime_impl(args.path, acc, modif, args.dir_fd, args.follow_symlinks, vm)
1780    }
1781
1782    fn utime_impl(
1783        path: OsPath,
1784        acc: Duration,
1785        modif: Duration,
1786        dir_fd: DirFd<'_, { UTIME_DIR_FD as usize }>,
1787        _follow_symlinks: FollowSymlinks,
1788        vm: &VirtualMachine,
1789    ) -> PyResult<()> {
1790        #[cfg(any(target_os = "wasi", unix))]
1791        {
1792            #[cfg(not(target_os = "redox"))]
1793            {
1794                let path_for_err = path.clone();
1795                let path = path.into_cstring(vm)?;
1796
1797                let ts = |d: Duration| libc::timespec {
1798                    tv_sec: d.as_secs() as _,
1799                    tv_nsec: d.subsec_nanos() as _,
1800                };
1801                let times = [ts(acc), ts(modif)];
1802
1803                let ret = unsafe {
1804                    libc::utimensat(
1805                        dir_fd.get().as_raw(),
1806                        path.as_ptr(),
1807                        times.as_ptr(),
1808                        if _follow_symlinks.0 {
1809                            0
1810                        } else {
1811                            libc::AT_SYMLINK_NOFOLLOW
1812                        },
1813                    )
1814                };
1815                if ret < 0 {
1816                    Err(OSErrorBuilder::with_filename(
1817                        &io::Error::last_os_error(),
1818                        path_for_err,
1819                        vm,
1820                    ))
1821                } else {
1822                    Ok(())
1823                }
1824            }
1825            #[cfg(target_os = "redox")]
1826            {
1827                let [] = dir_fd.0;
1828
1829                let tv = |d: Duration| libc::timeval {
1830                    tv_sec: d.as_secs() as _,
1831                    tv_usec: d.as_micros() as _,
1832                };
1833                nix::sys::stat::utimes(path.as_ref(), &tv(acc).into(), &tv(modif).into())
1834                    .map_err(|err| err.into_pyexception(vm))
1835            }
1836        }
1837        #[cfg(windows)]
1838        {
1839            use std::{fs::OpenOptions, os::windows::prelude::*};
1840            type DWORD = u32;
1841            use windows_sys::Win32::{Foundation::FILETIME, Storage::FileSystem};
1842
1843            let [] = dir_fd.0;
1844
1845            if !_follow_symlinks.0 {
1846                return Err(vm.new_not_implemented_error(
1847                    "utime: follow_symlinks unavailable on this platform",
1848                ));
1849            }
1850
1851            let ft = |d: Duration| {
1852                let intervals = ((d.as_secs() as i64 + 11644473600) * 10_000_000)
1853                    + (d.subsec_nanos() as i64 / 100);
1854                FILETIME {
1855                    dwLowDateTime: intervals as DWORD,
1856                    dwHighDateTime: (intervals >> 32) as DWORD,
1857                }
1858            };
1859
1860            let acc = ft(acc);
1861            let modif = ft(modif);
1862
1863            let f = OpenOptions::new()
1864                .write(true)
1865                .custom_flags(windows_sys::Win32::Storage::FileSystem::FILE_FLAG_BACKUP_SEMANTICS)
1866                .open(&path)
1867                .map_err(|err| OSErrorBuilder::with_filename(&err, path.clone(), vm))?;
1868
1869            let ret = unsafe {
1870                FileSystem::SetFileTime(f.as_raw_handle() as _, core::ptr::null(), &acc, &modif)
1871            };
1872
1873            if ret == 0 {
1874                Err(OSErrorBuilder::with_filename(
1875                    &io::Error::last_os_error(),
1876                    path,
1877                    vm,
1878                ))
1879            } else {
1880                Ok(())
1881            }
1882        }
1883    }
1884
1885    #[cfg(all(any(unix, windows), not(target_os = "redox")))]
1886    #[derive(Debug)]
1887    #[pystruct_sequence_data]
1888    struct TimesResultData {
1889        pub user: f64,
1890        pub system: f64,
1891        pub children_user: f64,
1892        pub children_system: f64,
1893        pub elapsed: f64,
1894    }
1895
1896    #[cfg(all(any(unix, windows), not(target_os = "redox")))]
1897    #[pyattr]
1898    #[pystruct_sequence(name = "times_result", module = "os", data = "TimesResultData")]
1899    struct PyTimesResult;
1900
1901    #[cfg(all(any(unix, windows), not(target_os = "redox")))]
1902    #[pyclass(with(PyStructSequence))]
1903    impl PyTimesResult {}
1904
1905    #[cfg(all(any(unix, windows), not(target_os = "redox")))]
1906    #[pyfunction]
1907    fn times(vm: &VirtualMachine) -> PyResult {
1908        #[cfg(windows)]
1909        {
1910            use core::mem::MaybeUninit;
1911            use windows_sys::Win32::{Foundation::FILETIME, System::Threading};
1912
1913            let mut _create = MaybeUninit::<FILETIME>::uninit();
1914            let mut _exit = MaybeUninit::<FILETIME>::uninit();
1915            let mut kernel = MaybeUninit::<FILETIME>::uninit();
1916            let mut user = MaybeUninit::<FILETIME>::uninit();
1917
1918            unsafe {
1919                let h_proc = Threading::GetCurrentProcess();
1920                Threading::GetProcessTimes(
1921                    h_proc,
1922                    _create.as_mut_ptr(),
1923                    _exit.as_mut_ptr(),
1924                    kernel.as_mut_ptr(),
1925                    user.as_mut_ptr(),
1926                );
1927            }
1928
1929            let kernel = unsafe { kernel.assume_init() };
1930            let user = unsafe { user.assume_init() };
1931
1932            let times_result = TimesResultData {
1933                user: user.dwHighDateTime as f64 * 429.4967296 + user.dwLowDateTime as f64 * 1e-7,
1934                system: kernel.dwHighDateTime as f64 * 429.4967296
1935                    + kernel.dwLowDateTime as f64 * 1e-7,
1936                children_user: 0.0,
1937                children_system: 0.0,
1938                elapsed: 0.0,
1939            };
1940
1941            Ok(times_result.to_pyobject(vm))
1942        }
1943        #[cfg(unix)]
1944        {
1945            let mut t = libc::tms {
1946                tms_utime: 0,
1947                tms_stime: 0,
1948                tms_cutime: 0,
1949                tms_cstime: 0,
1950            };
1951
1952            let tick_for_second = unsafe { libc::sysconf(libc::_SC_CLK_TCK) } as f64;
1953            let c = unsafe { libc::times(&mut t as *mut _) };
1954
1955            // XXX: The signedness of `clock_t` varies from platform to platform.
1956            if c == (-1i8) as libc::clock_t {
1957                return Err(vm.new_os_error("Fail to get times".to_string()));
1958            }
1959
1960            let times_result = TimesResultData {
1961                user: t.tms_utime as f64 / tick_for_second,
1962                system: t.tms_stime as f64 / tick_for_second,
1963                children_user: t.tms_cutime as f64 / tick_for_second,
1964                children_system: t.tms_cstime as f64 / tick_for_second,
1965                elapsed: c as f64 / tick_for_second,
1966            };
1967
1968            Ok(times_result.to_pyobject(vm))
1969        }
1970    }
1971
1972    #[cfg(target_os = "linux")]
1973    #[derive(FromArgs)]
1974    struct CopyFileRangeArgs<'fd> {
1975        #[pyarg(positional)]
1976        src: crt_fd::Borrowed<'fd>,
1977        #[pyarg(positional)]
1978        dst: crt_fd::Borrowed<'fd>,
1979        #[pyarg(positional)]
1980        count: i64,
1981        #[pyarg(any, default)]
1982        offset_src: Option<crt_fd::Offset>,
1983        #[pyarg(any, default)]
1984        offset_dst: Option<crt_fd::Offset>,
1985    }
1986
1987    #[cfg(target_os = "linux")]
1988    #[pyfunction]
1989    fn copy_file_range(args: CopyFileRangeArgs<'_>, vm: &VirtualMachine) -> PyResult<usize> {
1990        #[allow(clippy::unnecessary_option_map_or_else)]
1991        let p_offset_src = args.offset_src.as_ref().map_or_else(core::ptr::null, |x| x);
1992        #[allow(clippy::unnecessary_option_map_or_else)]
1993        let p_offset_dst = args.offset_dst.as_ref().map_or_else(core::ptr::null, |x| x);
1994        let count: usize = args
1995            .count
1996            .try_into()
1997            .map_err(|_| vm.new_value_error("count should >= 0"))?;
1998
1999        // The flags argument is provided to allow
2000        // for future extensions and currently must be to 0.
2001        let flags = 0u32;
2002
2003        // Safety: p_offset_src and p_offset_dst is a unique pointer for offset_src and offset_dst respectively,
2004        // and will only be freed after this function ends.
2005        //
2006        // Why not use `libc::copy_file_range`: On `musl-libc`, `libc::copy_file_range` is not provided. Therefore
2007        // we use syscalls directly instead.
2008        let ret = unsafe {
2009            libc::syscall(
2010                libc::SYS_copy_file_range,
2011                args.src,
2012                p_offset_src as *mut i64,
2013                args.dst,
2014                p_offset_dst as *mut i64,
2015                count,
2016                flags,
2017            )
2018        };
2019
2020        usize::try_from(ret).map_err(|_| vm.new_last_errno_error())
2021    }
2022
2023    #[pyfunction]
2024    fn strerror(e: i32) -> String {
2025        unsafe { core::ffi::CStr::from_ptr(libc::strerror(e)) }
2026            .to_string_lossy()
2027            .into_owned()
2028    }
2029
2030    #[pyfunction]
2031    pub fn ftruncate(fd: crt_fd::Borrowed<'_>, length: crt_fd::Offset) -> io::Result<()> {
2032        crt_fd::ftruncate(fd, length)
2033    }
2034
2035    #[pyfunction]
2036    fn truncate(path: PyObjectRef, length: crt_fd::Offset, vm: &VirtualMachine) -> PyResult<()> {
2037        match path.clone().try_into_value::<crt_fd::Borrowed<'_>>(vm) {
2038            Ok(fd) => return ftruncate(fd, length).map_err(|e| e.into_pyexception(vm)),
2039            Err(e) if e.fast_isinstance(vm.ctx.exceptions.warning) => return Err(e),
2040            Err(_) => {}
2041        }
2042
2043        #[cold]
2044        fn error(
2045            vm: &VirtualMachine,
2046            error: std::io::Error,
2047            path: OsPath,
2048        ) -> crate::builtins::PyBaseExceptionRef {
2049            OSErrorBuilder::with_filename(&error, path, vm)
2050        }
2051
2052        let path = OsPath::try_from_object(vm, path)?;
2053        // TODO: just call libc::truncate() on POSIX
2054        let f = match OpenOptions::new().write(true).open(&path) {
2055            Ok(f) => f,
2056            Err(e) => return Err(error(vm, e, path)),
2057        };
2058        f.set_len(length as u64).map_err(|e| error(vm, e, path))?;
2059        drop(f);
2060        Ok(())
2061    }
2062
2063    #[cfg(all(unix, not(any(target_os = "redox", target_os = "android"))))]
2064    #[pyfunction]
2065    fn getloadavg(vm: &VirtualMachine) -> PyResult<(f64, f64, f64)> {
2066        let mut loadavg = [0f64; 3];
2067
2068        // Safety: loadavg is on stack and only write by `getloadavg` and are freed
2069        // after this function ends.
2070        unsafe {
2071            if libc::getloadavg(&mut loadavg[0] as *mut f64, 3) != 3 {
2072                return Err(vm.new_os_error("Load averages are unobtainable".to_string()));
2073            }
2074        }
2075
2076        Ok((loadavg[0], loadavg[1], loadavg[2]))
2077    }
2078
2079    #[cfg(unix)]
2080    #[pyfunction]
2081    fn waitstatus_to_exitcode(status: i32, vm: &VirtualMachine) -> PyResult<i32> {
2082        let status = u32::try_from(status)
2083            .map_err(|_| vm.new_value_error(format!("invalid WEXITSTATUS: {status}")))?;
2084
2085        let status = status as libc::c_int;
2086        if libc::WIFEXITED(status) {
2087            return Ok(libc::WEXITSTATUS(status));
2088        }
2089
2090        if libc::WIFSIGNALED(status) {
2091            return Ok(-libc::WTERMSIG(status));
2092        }
2093
2094        Err(vm.new_value_error(format!("Invalid wait status: {status}")))
2095    }
2096
2097    #[cfg(windows)]
2098    #[pyfunction]
2099    fn waitstatus_to_exitcode(status: u64, vm: &VirtualMachine) -> PyResult<u32> {
2100        let exitcode = status >> 8;
2101        // ExitProcess() accepts an UINT type:
2102        // reject exit code which doesn't fit in an UINT
2103        u32::try_from(exitcode)
2104            .map_err(|_| vm.new_value_error(format!("Invalid exit code: {exitcode}")))
2105    }
2106
2107    #[pyfunction]
2108    fn device_encoding(fd: i32, _vm: &VirtualMachine) -> PyResult<Option<String>> {
2109        if !isatty(fd) {
2110            return Ok(None);
2111        }
2112
2113        cfg_if::cfg_if! {
2114            if #[cfg(any(target_os = "android", target_os = "redox"))] {
2115                Ok(Some("UTF-8".to_owned()))
2116            } else if #[cfg(windows)] {
2117                use windows_sys::Win32::System::Console;
2118                let cp = match fd {
2119                    0 => unsafe { Console::GetConsoleCP() },
2120                    1 | 2 => unsafe { Console::GetConsoleOutputCP() },
2121                    _ => 0,
2122                };
2123
2124                Ok(Some(format!("cp{cp}")))
2125            } else {
2126                let encoding = unsafe {
2127                    let encoding = libc::nl_langinfo(libc::CODESET);
2128                    if encoding.is_null() || encoding.read() == b'\0' as libc::c_char {
2129                        "UTF-8".to_owned()
2130                    } else {
2131                        core::ffi::CStr::from_ptr(encoding).to_string_lossy().into_owned()
2132                    }
2133                };
2134
2135                Ok(Some(encoding))
2136            }
2137        }
2138    }
2139
2140    #[pystruct_sequence_data]
2141    #[allow(dead_code)]
2142    pub(crate) struct TerminalSizeData {
2143        pub columns: usize,
2144        pub lines: usize,
2145    }
2146
2147    #[pyattr]
2148    #[pystruct_sequence(name = "terminal_size", module = "os", data = "TerminalSizeData")]
2149    pub(crate) struct PyTerminalSize;
2150
2151    #[pyclass(with(PyStructSequence))]
2152    impl PyTerminalSize {}
2153
2154    #[derive(Debug)]
2155    #[pystruct_sequence_data]
2156    pub(crate) struct UnameResultData {
2157        pub sysname: String,
2158        pub nodename: String,
2159        pub release: String,
2160        pub version: String,
2161        pub machine: String,
2162    }
2163
2164    #[pyattr]
2165    #[pystruct_sequence(name = "uname_result", module = "os", data = "UnameResultData")]
2166    pub(crate) struct PyUnameResult;
2167
2168    #[pyclass(with(PyStructSequence))]
2169    impl PyUnameResult {}
2170
2171    // statvfs_result: Result from statvfs or fstatvfs.
2172    // = statvfs_result_fields
2173    #[cfg(all(unix, not(target_os = "redox")))]
2174    #[derive(Debug)]
2175    #[pystruct_sequence_data]
2176    pub(crate) struct StatvfsResultData {
2177        pub f_bsize: libc::c_ulong,     // filesystem block size
2178        pub f_frsize: libc::c_ulong,    // fragment size
2179        pub f_blocks: libc::fsblkcnt_t, // size of fs in f_frsize units
2180        pub f_bfree: libc::fsblkcnt_t,  // free blocks
2181        pub f_bavail: libc::fsblkcnt_t, // free blocks for unprivileged users
2182        pub f_files: libc::fsfilcnt_t,  // inodes
2183        pub f_ffree: libc::fsfilcnt_t,  // free inodes
2184        pub f_favail: libc::fsfilcnt_t, // free inodes for unprivileged users
2185        pub f_flag: libc::c_ulong,      // mount flags
2186        pub f_namemax: libc::c_ulong,   // maximum filename length
2187        #[pystruct_sequence(skip)]
2188        pub f_fsid: libc::c_ulong, // filesystem ID (not in tuple but accessible as attribute)
2189    }
2190
2191    #[cfg(all(unix, not(target_os = "redox")))]
2192    #[pyattr]
2193    #[pystruct_sequence(name = "statvfs_result", module = "os", data = "StatvfsResultData")]
2194    pub(crate) struct PyStatvfsResult;
2195
2196    #[cfg(all(unix, not(target_os = "redox")))]
2197    #[pyclass(with(PyStructSequence))]
2198    impl PyStatvfsResult {
2199        #[pyslot]
2200        fn slot_new(cls: PyTypeRef, args: FuncArgs, vm: &VirtualMachine) -> PyResult {
2201            let seq: PyObjectRef = args.bind(vm)?;
2202            crate::types::struct_sequence_new(cls, seq, vm)
2203        }
2204    }
2205
2206    #[cfg(all(unix, not(target_os = "redox")))]
2207    impl StatvfsResultData {
2208        fn from_statvfs(st: libc::statvfs) -> Self {
2209            // f_fsid is a struct on some platforms (e.g., Linux fsid_t) and a scalar on others.
2210            // We extract raw bytes and interpret as a native-endian integer.
2211            // Note: The value may differ across architectures due to endianness.
2212            let f_fsid = {
2213                let ptr = core::ptr::addr_of!(st.f_fsid) as *const u8;
2214                let size = core::mem::size_of_val(&st.f_fsid);
2215                if size >= 8 {
2216                    let bytes = unsafe { core::slice::from_raw_parts(ptr, 8) };
2217                    u64::from_ne_bytes([
2218                        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6],
2219                        bytes[7],
2220                    ]) as libc::c_ulong
2221                } else if size >= 4 {
2222                    let bytes = unsafe { core::slice::from_raw_parts(ptr, 4) };
2223                    u32::from_ne_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as libc::c_ulong
2224                } else {
2225                    0
2226                }
2227            };
2228
2229            Self {
2230                f_bsize: st.f_bsize,
2231                f_frsize: st.f_frsize,
2232                f_blocks: st.f_blocks,
2233                f_bfree: st.f_bfree,
2234                f_bavail: st.f_bavail,
2235                f_files: st.f_files,
2236                f_ffree: st.f_ffree,
2237                f_favail: st.f_favail,
2238                f_flag: st.f_flag,
2239                f_namemax: st.f_namemax,
2240                f_fsid,
2241            }
2242        }
2243    }
2244
2245    /// Perform a statvfs system call on the given path.
2246    #[cfg(all(unix, not(target_os = "redox")))]
2247    #[pyfunction]
2248    #[pyfunction(name = "fstatvfs")]
2249    fn statvfs(path: OsPathOrFd<'_>, vm: &VirtualMachine) -> PyResult {
2250        let mut st: libc::statvfs = unsafe { core::mem::zeroed() };
2251        let ret = match &path {
2252            OsPathOrFd::Path(p) => {
2253                let cpath = p.clone().into_cstring(vm)?;
2254                unsafe { libc::statvfs(cpath.as_ptr(), &mut st) }
2255            }
2256            OsPathOrFd::Fd(fd) => unsafe { libc::fstatvfs(fd.as_raw(), &mut st) },
2257        };
2258        if ret != 0 {
2259            return Err(OSErrorBuilder::with_filename(
2260                &io::Error::last_os_error(),
2261                path,
2262                vm,
2263            ));
2264        }
2265        Ok(StatvfsResultData::from_statvfs(st).to_pyobject(vm))
2266    }
2267
2268    pub(super) fn support_funcs() -> Vec<SupportFunc> {
2269        let mut supports = super::platform::module::support_funcs();
2270        supports.extend(vec![
2271            SupportFunc::new("open", Some(false), Some(OPEN_DIR_FD), Some(false)),
2272            SupportFunc::new("access", Some(false), Some(false), None),
2273            SupportFunc::new("chdir", None, Some(false), Some(false)),
2274            // chflags Some, None Some
2275            SupportFunc::new("link", Some(false), Some(false), Some(cfg!(unix))),
2276            SupportFunc::new("listdir", Some(LISTDIR_FD), Some(false), Some(false)),
2277            SupportFunc::new("mkdir", Some(false), Some(MKDIR_DIR_FD), Some(false)),
2278            // mkfifo Some Some None
2279            // mknod Some Some None
2280            SupportFunc::new("readlink", Some(false), None, Some(false)),
2281            SupportFunc::new("remove", Some(false), Some(UNLINK_DIR_FD), Some(false)),
2282            SupportFunc::new("unlink", Some(false), Some(UNLINK_DIR_FD), Some(false)),
2283            SupportFunc::new("rename", Some(false), None, Some(false)),
2284            SupportFunc::new("replace", Some(false), None, Some(false)), // TODO: Fix replace
2285            SupportFunc::new("rmdir", Some(false), Some(RMDIR_DIR_FD), Some(false)),
2286            SupportFunc::new("scandir", Some(SCANDIR_FD), Some(false), Some(false)),
2287            SupportFunc::new("stat", Some(true), Some(STAT_DIR_FD), Some(true)),
2288            SupportFunc::new("fstat", Some(true), Some(STAT_DIR_FD), Some(true)),
2289            SupportFunc::new("symlink", Some(false), Some(SYMLINK_DIR_FD), Some(false)),
2290            SupportFunc::new("truncate", Some(true), Some(false), Some(false)),
2291            SupportFunc::new("ftruncate", Some(true), Some(false), Some(false)),
2292            SupportFunc::new("fsync", Some(true), Some(false), Some(false)),
2293            SupportFunc::new(
2294                "utime",
2295                Some(false),
2296                Some(UTIME_DIR_FD),
2297                Some(cfg!(all(unix, not(target_os = "redox")))),
2298            ),
2299        ]);
2300        supports
2301    }
2302}
2303pub(crate) use _os::{ftruncate, isatty, lseek};
2304
2305pub(crate) struct SupportFunc {
2306    name: &'static str,
2307    // realistically, each of these is just a bool of "is this function in the supports_* set".
2308    // However, None marks that the function maybe _should_ support fd/dir_fd/follow_symlinks, but
2309    // we haven't implemented it yet.
2310    fd: Option<bool>,
2311    dir_fd: Option<bool>,
2312    follow_symlinks: Option<bool>,
2313}
2314
2315impl SupportFunc {
2316    pub(crate) const fn new(
2317        name: &'static str,
2318        fd: Option<bool>,
2319        dir_fd: Option<bool>,
2320        follow_symlinks: Option<bool>,
2321    ) -> Self {
2322        Self {
2323            name,
2324            fd,
2325            dir_fd,
2326            follow_symlinks,
2327        }
2328    }
2329}
2330
2331pub fn module_exec(vm: &VirtualMachine, module: &Py<PyModule>) -> PyResult<()> {
2332    let support_funcs = _os::support_funcs();
2333    let supports_fd = PySet::default().into_ref(&vm.ctx);
2334    let supports_dir_fd = PySet::default().into_ref(&vm.ctx);
2335    let supports_follow_symlinks = PySet::default().into_ref(&vm.ctx);
2336    for support in support_funcs {
2337        let func_obj = module.get_attr(support.name, vm)?;
2338        if support.fd.unwrap_or(false) {
2339            supports_fd.clone().add(func_obj.clone(), vm)?;
2340        }
2341        if support.dir_fd.unwrap_or(false) {
2342            supports_dir_fd.clone().add(func_obj.clone(), vm)?;
2343        }
2344        if support.follow_symlinks.unwrap_or(false) {
2345            supports_follow_symlinks.clone().add(func_obj, vm)?;
2346        }
2347    }
2348
2349    extend_module!(vm, module, {
2350        "supports_fd" => supports_fd,
2351        "supports_dir_fd" => supports_dir_fd,
2352        "supports_follow_symlinks" => supports_follow_symlinks,
2353        "error" => vm.ctx.exceptions.os_error.to_owned(),
2354    });
2355
2356    Ok(())
2357}
2358
2359/// Convert a mapping (e.g. os._Environ) to a plain dict for use by execve/posix_spawn.
2360///
2361/// For `os._Environ`, accesses the internal `_data` dict directly at the Rust level.
2362/// This avoids Python-level method calls that can deadlock after fork() when
2363/// parking_lot locks are held by threads that no longer exist.
2364#[cfg(any(unix, windows))]
2365pub(crate) fn envobj_to_dict(
2366    env: crate::function::ArgMapping,
2367    vm: &VirtualMachine,
2368) -> PyResult<crate::builtins::PyDictRef> {
2369    let obj = env.obj();
2370    if let Some(dict) = obj.downcast_ref_if_exact::<crate::builtins::PyDict>(vm) {
2371        return Ok(dict.to_owned());
2372    }
2373    if let Some(inst_dict) = obj.dict()
2374        && let Ok(Some(data)) = inst_dict.get_item_opt("_data", vm)
2375        && let Some(dict) = data.downcast_ref_if_exact::<crate::builtins::PyDict>(vm)
2376    {
2377        return Ok(dict.to_owned());
2378    }
2379    let keys = vm.call_method(obj, "keys", ())?;
2380    let dict = vm.ctx.new_dict();
2381    for key in keys.get_iter(vm)?.into_iter::<PyObjectRef>(vm)? {
2382        let key = key?;
2383        let val = obj.get_item(&*key, vm)?;
2384        dict.set_item(&*key, val, vm)?;
2385    }
2386    Ok(dict)
2387}
2388
2389#[cfg(not(windows))]
2390use super::posix as platform;
2391
2392#[cfg(windows)]
2393use super::nt as platform;
2394
2395pub(crate) use platform::module::MODULE_NAME;