rustpython_vm/function/
buffer.rs1use crate::{
2 AsObject, PyObject, PyObjectRef, PyResult, TryFromBorrowedObject, TryFromObject,
3 VirtualMachine,
4 builtins::{PyStr, PyStrRef},
5 common::borrow::{BorrowedValue, BorrowedValueMut},
6 protocol::PyBuffer,
7};
8
9#[derive(Debug, Traverse)]
13pub struct ArgBytesLike(PyBuffer);
14
15impl PyObject {
16 pub fn try_bytes_like<R>(
17 &self,
18 vm: &VirtualMachine,
19 f: impl FnOnce(&[u8]) -> R,
20 ) -> PyResult<R> {
21 let buffer = PyBuffer::try_from_borrowed_object(vm, self)?;
22 buffer
23 .as_contiguous()
24 .map(|x| f(&x))
25 .ok_or_else(|| vm.new_buffer_error("non-contiguous buffer is not a bytes-like object"))
26 }
27
28 pub fn try_rw_bytes_like<R>(
29 &self,
30 vm: &VirtualMachine,
31 f: impl FnOnce(&mut [u8]) -> R,
32 ) -> PyResult<R> {
33 let buffer = PyBuffer::try_from_borrowed_object(vm, self)?;
34 buffer
35 .as_contiguous_mut()
36 .map(|mut x| f(&mut x))
37 .ok_or_else(|| vm.new_type_error("buffer is not a read-write bytes-like object"))
38 }
39}
40
41impl ArgBytesLike {
42 pub fn borrow_buf(&self) -> BorrowedValue<'_, [u8]> {
43 unsafe { self.0.contiguous_unchecked() }
44 }
45
46 pub fn with_ref<F, R>(&self, f: F) -> R
47 where
48 F: FnOnce(&[u8]) -> R,
49 {
50 f(&self.borrow_buf())
51 }
52
53 pub const fn len(&self) -> usize {
54 self.0.desc.len
55 }
56
57 pub const fn is_empty(&self) -> bool {
58 self.len() == 0
59 }
60
61 pub fn as_object(&self) -> &PyObject {
62 &self.0.obj
63 }
64}
65
66impl From<ArgBytesLike> for PyBuffer {
67 fn from(buffer: ArgBytesLike) -> Self {
68 buffer.0
69 }
70}
71
72impl From<ArgBytesLike> for PyObjectRef {
73 fn from(buffer: ArgBytesLike) -> Self {
74 buffer.as_object().to_owned()
75 }
76}
77
78impl<'a> TryFromBorrowedObject<'a> for ArgBytesLike {
79 fn try_from_borrowed_object(vm: &VirtualMachine, obj: &'a PyObject) -> PyResult<Self> {
80 let buffer = PyBuffer::try_from_borrowed_object(vm, obj)?;
81 if buffer.desc.is_contiguous() {
82 Ok(Self(buffer))
83 } else {
84 Err(vm.new_buffer_error("non-contiguous buffer is not a bytes-like object"))
85 }
86 }
87}
88
89#[derive(Debug, Traverse)]
91pub struct ArgMemoryBuffer(PyBuffer);
92
93impl ArgMemoryBuffer {
94 pub fn borrow_buf_mut(&self) -> BorrowedValueMut<'_, [u8]> {
95 unsafe { self.0.contiguous_mut_unchecked() }
96 }
97
98 pub fn with_ref<F, R>(&self, f: F) -> R
99 where
100 F: FnOnce(&mut [u8]) -> R,
101 {
102 f(&mut self.borrow_buf_mut())
103 }
104
105 pub const fn len(&self) -> usize {
106 self.0.desc.len
107 }
108
109 pub const fn is_empty(&self) -> bool {
110 self.len() == 0
111 }
112}
113
114impl From<ArgMemoryBuffer> for PyBuffer {
115 fn from(buffer: ArgMemoryBuffer) -> Self {
116 buffer.0
117 }
118}
119
120impl<'a> TryFromBorrowedObject<'a> for ArgMemoryBuffer {
121 fn try_from_borrowed_object(vm: &VirtualMachine, obj: &'a PyObject) -> PyResult<Self> {
122 let buffer = PyBuffer::try_from_borrowed_object(vm, obj)?;
123 if !buffer.desc.is_contiguous() {
124 Err(vm.new_buffer_error("non-contiguous buffer is not a bytes-like object"))
125 } else if buffer.desc.readonly {
126 Err(vm.new_type_error("buffer is not a read-write bytes-like object"))
127 } else {
128 Ok(Self(buffer))
129 }
130 }
131}
132
133pub enum ArgStrOrBytesLike {
135 Buf(ArgBytesLike),
136 Str(PyStrRef),
137}
138
139impl ArgStrOrBytesLike {
140 pub fn as_object(&self) -> &PyObject {
141 match self {
142 Self::Buf(b) => b.as_object(),
143 Self::Str(s) => s.as_object(),
144 }
145 }
146}
147
148impl TryFromObject for ArgStrOrBytesLike {
149 fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
150 obj.downcast()
151 .map(Self::Str)
152 .or_else(|obj| ArgBytesLike::try_from_object(vm, obj).map(Self::Buf))
153 }
154}
155
156impl ArgStrOrBytesLike {
157 pub fn borrow_bytes(&self) -> BorrowedValue<'_, [u8]> {
158 match self {
159 Self::Buf(b) => b.borrow_buf(),
160 Self::Str(s) => s.as_bytes().into(),
161 }
162 }
163}
164
165#[derive(Debug)]
166pub enum ArgAsciiBuffer {
167 String(PyStrRef),
168 Buffer(ArgBytesLike),
169}
170
171impl TryFromObject for ArgAsciiBuffer {
172 fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
173 match obj.downcast::<PyStr>() {
174 Ok(string) => {
175 if string.as_wtf8().is_ascii() {
176 Ok(Self::String(string))
177 } else {
178 Err(vm.new_value_error("string argument should contain only ASCII characters"))
179 }
180 }
181 Err(obj) => ArgBytesLike::try_from_object(vm, obj).map(ArgAsciiBuffer::Buffer),
182 }
183 }
184}
185
186impl ArgAsciiBuffer {
187 pub fn len(&self) -> usize {
188 match self {
189 Self::String(s) => s.as_wtf8().len(),
190 Self::Buffer(buffer) => buffer.len(),
191 }
192 }
193
194 pub fn is_empty(&self) -> bool {
195 self.len() == 0
196 }
197
198 #[inline]
199 pub fn with_ref<R>(&self, f: impl FnOnce(&[u8]) -> R) -> R {
200 match self {
201 Self::String(s) => f(s.as_bytes()),
202 Self::Buffer(buffer) => buffer.with_ref(f),
203 }
204 }
205}