Skip to main content

rustpython_vm/
ospath.rs

1use rustpython_common::crt_fd;
2
3use crate::{
4    AsObject, PyObjectRef, PyResult, VirtualMachine,
5    builtins::{PyBytes, PyStr},
6    class::StaticType,
7    convert::{IntoPyException, ToPyException, ToPyObject, TryFromObject},
8    function::FsPath,
9};
10use std::path::{Path, PathBuf};
11
12/// path_converter
13#[derive(Clone, Copy, Default)]
14pub struct PathConverter {
15    /// Function name for error messages (e.g., "rename")
16    pub function_name: Option<&'static str>,
17    /// Argument name for error messages (e.g., "src", "dst")
18    pub argument_name: Option<&'static str>,
19    /// If true, embedded null characters are allowed
20    pub non_strict: bool,
21}
22
23impl PathConverter {
24    pub const fn new() -> Self {
25        Self {
26            function_name: None,
27            argument_name: None,
28            non_strict: false,
29        }
30    }
31
32    pub const fn function(mut self, name: &'static str) -> Self {
33        self.function_name = Some(name);
34        self
35    }
36
37    pub const fn argument(mut self, name: &'static str) -> Self {
38        self.argument_name = Some(name);
39        self
40    }
41
42    pub const fn non_strict(mut self) -> Self {
43        self.non_strict = true;
44        self
45    }
46
47    /// Generate error message prefix like "rename: "
48    fn error_prefix(&self) -> String {
49        match self.function_name {
50            Some(func) => format!("{}: ", func),
51            None => String::new(),
52        }
53    }
54
55    /// Get argument name for error messages, defaults to "path"
56    fn arg_name(&self) -> &'static str {
57        self.argument_name.unwrap_or("path")
58    }
59
60    /// Format a type error message
61    fn type_error_msg(&self, type_name: &str, allow_fd: bool) -> String {
62        let expected = if allow_fd {
63            "string, bytes, os.PathLike or integer"
64        } else {
65            "string, bytes or os.PathLike"
66        };
67        format!(
68            "{}{} should be {}, not {}",
69            self.error_prefix(),
70            self.arg_name(),
71            expected,
72            type_name
73        )
74    }
75
76    /// Convert to OsPathOrFd (path or file descriptor)
77    pub(crate) fn try_path_or_fd<'fd>(
78        &self,
79        obj: PyObjectRef,
80        vm: &VirtualMachine,
81    ) -> PyResult<OsPathOrFd<'fd>> {
82        // Handle fd (before __fspath__ check, like CPython)
83        if let Some(int) = obj.try_index_opt(vm) {
84            // Warn if bool is used as a file descriptor
85            if obj
86                .class()
87                .is(crate::builtins::bool_::PyBool::static_type())
88            {
89                crate::stdlib::_warnings::warn(
90                    vm.ctx.exceptions.runtime_warning,
91                    "bool is used as a file descriptor".to_owned(),
92                    1,
93                    vm,
94                )?;
95            }
96            let fd = int?.try_to_primitive(vm)?;
97            return unsafe { crt_fd::Borrowed::try_borrow_raw(fd) }
98                .map(OsPathOrFd::Fd)
99                .map_err(|e| e.into_pyexception(vm));
100        }
101
102        self.try_path_inner(obj, true, vm).map(OsPathOrFd::Path)
103    }
104
105    /// Convert to OsPath only (no fd support)
106    fn try_path_inner(
107        &self,
108        obj: PyObjectRef,
109        allow_fd: bool,
110        vm: &VirtualMachine,
111    ) -> PyResult<OsPath> {
112        // Try direct str/bytes match
113        let obj = match self.try_match_str_bytes(obj.clone(), vm)? {
114            Ok(path) => return Ok(path),
115            Err(obj) => obj,
116        };
117
118        // Call __fspath__
119        let type_error_msg = || self.type_error_msg(&obj.class().name(), allow_fd);
120        let method =
121            vm.get_method_or_type_error(obj.clone(), identifier!(vm, __fspath__), type_error_msg)?;
122        if vm.is_none(&method) {
123            return Err(vm.new_type_error(type_error_msg()));
124        }
125        let result = method.call((), vm)?;
126
127        // Match __fspath__ result
128        self.try_match_str_bytes(result.clone(), vm)?.map_err(|_| {
129            vm.new_type_error(format!(
130                "{}expected {}.__fspath__() to return str or bytes, not {}",
131                self.error_prefix(),
132                obj.class().name(),
133                result.class().name(),
134            ))
135        })
136    }
137
138    /// Try to match str or bytes, returns Err(obj) if neither
139    fn try_match_str_bytes(
140        &self,
141        obj: PyObjectRef,
142        vm: &VirtualMachine,
143    ) -> PyResult<Result<OsPath, PyObjectRef>> {
144        let check_nul = |b: &[u8]| {
145            if self.non_strict || memchr::memchr(b'\0', b).is_none() {
146                Ok(())
147            } else {
148                Err(vm.new_value_error(format!(
149                    "{}embedded null character in {}",
150                    self.error_prefix(),
151                    self.arg_name()
152                )))
153            }
154        };
155
156        match_class!(match obj {
157            s @ PyStr => {
158                check_nul(s.as_bytes())?;
159                let path = vm.fsencode(&s)?.into_owned();
160                Ok(Ok(OsPath {
161                    path,
162                    origin: Some(s.into()),
163                }))
164            }
165            b @ PyBytes => {
166                check_nul(&b)?;
167                let path = FsPath::bytes_as_os_str(&b, vm)?.to_owned();
168                Ok(Ok(OsPath {
169                    path,
170                    origin: Some(b.into()),
171                }))
172            }
173            obj => Ok(Err(obj)),
174        })
175    }
176
177    /// Convert to OsPath directly
178    pub fn try_path(&self, obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<OsPath> {
179        self.try_path_inner(obj, false, vm)
180    }
181}
182
183/// path_t output - the converted path
184#[derive(Clone)]
185pub struct OsPath {
186    pub path: std::ffi::OsString,
187    /// Original Python object for identity preservation in OSError
188    pub(super) origin: Option<PyObjectRef>,
189}
190
191#[derive(Debug, Copy, Clone)]
192pub enum OutputMode {
193    String,
194    Bytes,
195}
196
197impl OutputMode {
198    pub(super) fn process_path(self, path: impl Into<PathBuf>, vm: &VirtualMachine) -> PyObjectRef {
199        fn inner(mode: OutputMode, path: PathBuf, vm: &VirtualMachine) -> PyObjectRef {
200            match mode {
201                OutputMode::String => vm.fsdecode(path).into(),
202                OutputMode::Bytes => vm
203                    .ctx
204                    .new_bytes(path.into_os_string().into_encoded_bytes())
205                    .into(),
206            }
207        }
208        inner(self, path.into(), vm)
209    }
210}
211
212impl OsPath {
213    pub fn new_str(path: impl Into<std::ffi::OsString>) -> Self {
214        let path = path.into();
215        Self { path, origin: None }
216    }
217
218    pub(crate) fn from_fspath(fspath: FsPath, vm: &VirtualMachine) -> PyResult<Self> {
219        let path = fspath.as_os_str(vm)?.into_owned();
220        let origin = match fspath {
221            FsPath::Str(s) => s.into(),
222            FsPath::Bytes(b) => b.into(),
223        };
224        Ok(Self {
225            path,
226            origin: Some(origin),
227        })
228    }
229
230    /// Convert an object to OsPath using the os.fspath-style error message.
231    /// This is used by open() which should report "expected str, bytes or os.PathLike object, not"
232    /// instead of "should be string, bytes or os.PathLike, not".
233    pub(crate) fn try_from_fspath(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<Self> {
234        let fspath = FsPath::try_from_path_like(obj, true, vm)?;
235        Self::from_fspath(fspath, vm)
236    }
237
238    pub fn as_path(&self) -> &Path {
239        Path::new(&self.path)
240    }
241
242    pub fn into_bytes(self) -> Vec<u8> {
243        self.path.into_encoded_bytes()
244    }
245
246    pub fn to_string_lossy(&self) -> alloc::borrow::Cow<'_, str> {
247        self.path.to_string_lossy()
248    }
249
250    pub fn into_cstring(self, vm: &VirtualMachine) -> PyResult<alloc::ffi::CString> {
251        alloc::ffi::CString::new(self.into_bytes()).map_err(|err| err.to_pyexception(vm))
252    }
253
254    #[cfg(windows)]
255    pub fn to_wide_cstring(&self, vm: &VirtualMachine) -> PyResult<widestring::WideCString> {
256        widestring::WideCString::from_os_str(&self.path).map_err(|err| err.to_pyexception(vm))
257    }
258
259    pub fn filename(&self, vm: &VirtualMachine) -> PyObjectRef {
260        if let Some(ref origin) = self.origin {
261            origin.clone()
262        } else {
263            // Default to string when no origin (e.g., from new_str)
264            OutputMode::String.process_path(self.path.clone(), vm)
265        }
266    }
267
268    /// Get the output mode based on origin type (bytes -> Bytes, otherwise -> String)
269    pub fn mode(&self) -> OutputMode {
270        match &self.origin {
271            Some(obj) if obj.downcast_ref::<PyBytes>().is_some() => OutputMode::Bytes,
272            _ => OutputMode::String,
273        }
274    }
275}
276
277impl AsRef<Path> for OsPath {
278    fn as_ref(&self) -> &Path {
279        self.as_path()
280    }
281}
282
283impl TryFromObject for OsPath {
284    fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
285        PathConverter::new().try_path(obj, vm)
286    }
287}
288
289// path_t with allow_fd in CPython
290#[derive(Clone)]
291pub(crate) enum OsPathOrFd<'fd> {
292    Path(OsPath),
293    Fd(crt_fd::Borrowed<'fd>),
294}
295
296impl TryFromObject for OsPathOrFd<'_> {
297    fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
298        PathConverter::new().try_path_or_fd(obj, vm)
299    }
300}
301
302impl From<OsPath> for OsPathOrFd<'_> {
303    fn from(path: OsPath) -> Self {
304        Self::Path(path)
305    }
306}
307
308impl OsPathOrFd<'_> {
309    pub fn filename(&self, vm: &VirtualMachine) -> PyObjectRef {
310        match self {
311            Self::Path(path) => path.filename(vm),
312            Self::Fd(fd) => fd.to_pyobject(vm),
313        }
314    }
315}
316
317impl crate::exceptions::OSErrorBuilder {
318    #[must_use]
319    pub(crate) fn with_filename<'a>(
320        error: &std::io::Error,
321        filename: impl Into<OsPathOrFd<'a>>,
322        vm: &VirtualMachine,
323    ) -> crate::builtins::PyBaseExceptionRef {
324        // TODO: return type to PyRef<PyOSError>
325        use crate::exceptions::ToOSErrorBuilder;
326        let builder = error.to_os_error_builder(vm);
327        let builder = builder.filename(filename.into().filename(vm));
328        builder.build(vm).upcast()
329    }
330
331    /// Like `with_filename`, but strips winerror on Windows.
332    /// Use for C runtime errors (open, fstat, etc.) that should produce
333    /// `[Errno X]` format instead of `[WinError X]`.
334    #[must_use]
335    pub(crate) fn with_filename_from_errno<'a>(
336        error: &std::io::Error,
337        filename: impl Into<OsPathOrFd<'a>>,
338        vm: &VirtualMachine,
339    ) -> crate::builtins::PyBaseExceptionRef {
340        use crate::exceptions::ToOSErrorBuilder;
341        let builder = error.to_os_error_builder(vm);
342        #[cfg(windows)]
343        let builder = builder.without_winerror();
344        let builder = builder.filename(filename.into().filename(vm));
345        builder.build(vm).upcast()
346    }
347}