Skip to main content

zng_task/channel/
ipc_bytes_cast.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{fmt, io, iter::FusedIterator, marker::PhantomData, ops};
4
5use crate::channel::{IpcBytes, IpcBytesIntoIter, IpcBytesMut};
6
7/// Safe bytemuck casting wrapper for [`IpcBytesMut`].
8///
9/// Use [`IpcBytesMut::cast`] to cast.
10pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
11    bytes: IpcBytesMut,
12    _t: PhantomData<T>,
13}
14impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
15    type Target = [T];
16
17    fn deref(&self) -> &Self::Target {
18        bytemuck::cast_slice::<u8, T>(&self.bytes)
19    }
20}
21impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
22    fn deref_mut(&mut self) -> &mut Self::Target {
23        bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
24    }
25}
26impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
27    /// Convert back to [`IpcBytesMut`].
28    pub fn into_inner(self) -> IpcBytesMut {
29        self.bytes
30    }
31}
32impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
33    fn from(value: IpcBytesMutCast<T>) -> Self {
34        value.bytes
35    }
36}
37fn item_len_to_bytes<T: 'static>(len: usize) -> io::Result<usize> {
38    match len.checked_mul(size_of::<T>()) {
39        Some(l) => Ok(l),
40        None => Err(io::Error::new(io::ErrorKind::FileTooLarge, "cannot map more than usize::MAX")),
41    }
42}
43impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
44    /// Allocate zeroed mutable memory.
45    pub async fn new(len: usize) -> io::Result<Self> {
46        IpcBytesMut::new(item_len_to_bytes::<T>(len)?).await.map(IpcBytesMut::cast)
47    }
48
49    /// Allocate zeroed mutable memory.
50    pub fn new_blocking(len: usize) -> io::Result<Self> {
51        IpcBytesMut::new_blocking(item_len_to_bytes::<T>(len)?).map(IpcBytesMut::cast)
52    }
53
54    /// Allocate zeroed mutable memory in a memory map.
55    ///
56    /// Note that [`new`] automatically selects the best memory storage for the given `len`, this
57    /// function enforces the usage of a memory map, the slowest of the options.
58    ///
59    /// [`new`]: Self::new
60    #[cfg(ipc)]
61    pub async fn new_memmap(len: usize) -> io::Result<Self> {
62        IpcBytesMut::new_memmap(item_len_to_bytes::<T>(len)?).await.map(IpcBytesMut::cast)
63    }
64
65    /// Allocate zeroed mutable memory in a memory map.
66    ///
67    /// Note that [`new_blocking`] automatically selects the best memory storage for the given `len`, this
68    /// function enforces the usage of a memory map, the slowest of the options.
69    ///
70    /// [`new_blocking`]: Self::new_blocking
71    #[cfg(ipc)]
72    pub fn new_memmap_blocking(len: usize) -> io::Result<Self> {
73        IpcBytesMut::new_memmap_blocking(item_len_to_bytes::<T>(len)?).map(IpcBytesMut::cast)
74    }
75
76    /// Uses `buf` or copies it to exclusive mutable memory.
77    pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
78        IpcBytesMut::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytesMut::cast)
79    }
80
81    /// Uses `buf` or copies it to exclusive mutable memory.
82    pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
83        IpcBytesMut::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytesMut::cast)
84    }
85
86    /// Copy data from slice.
87    pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
88        IpcBytesMut::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytesMut::cast)
89    }
90
91    /// Reference the underlying raw bytes.
92    pub fn as_bytes(&mut self) -> &mut IpcBytesMut {
93        &mut self.bytes
94    }
95}
96impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
97    /// Convert to immutable shareable [`IpcBytesCast`].
98    pub async fn finish(self) -> io::Result<IpcBytesCast<T>> {
99        self.bytes.finish().await.map(IpcBytes::cast)
100    }
101
102    /// Convert to immutable shareable [`IpcBytesCast`].
103    pub fn finish_blocking(self) -> io::Result<IpcBytesCast<T>> {
104        self.bytes.finish_blocking().map(IpcBytes::cast)
105    }
106}
107
108impl IpcBytesMut {
109    /// Safe bytemuck casting wrapper.
110    ///
111    /// The wrapper will deref to `[T]` and can be converted back to `IpcBytesMust`.
112    ///
113    /// # Panics
114    ///
115    /// Panics if cannot cast, se [bytemuck docs] for details.
116    ///
117    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
118    pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
119        let r = IpcBytesMutCast {
120            bytes: self,
121            _t: PhantomData,
122        };
123        let _assert = &r[..];
124        r
125    }
126
127    /// Safe bytemuck cast to slice.
128    ///
129    /// # Panics
130    ///
131    /// Panics if cannot cast, se [bytemuck docs] for details.
132    ///
133    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
134    pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
135        bytemuck::cast_slice(self)
136    }
137
138    /// Safe bytemuck cast to mutable slice.
139    ///
140    /// # Panics
141    ///
142    /// Panics if cannot cast, se [bytemuck docs] for details.
143    ///
144    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
145    pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
146        bytemuck::cast_slice_mut(self)
147    }
148}
149
150/// Safe bytemuck casting wrapper for [`IpcBytes`].
151///
152/// Use [`IpcBytes::cast`] to cast.
153pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
154    bytes: IpcBytes,
155    _t: PhantomData<T>,
156}
157impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCast<T> {
158    fn default() -> Self {
159        Self {
160            bytes: Default::default(),
161            _t: PhantomData,
162        }
163    }
164}
165impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
166    type Target = [T];
167
168    fn deref(&self) -> &Self::Target {
169        bytemuck::cast_slice::<u8, T>(&self.bytes)
170    }
171}
172impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
173    /// Convert back to [`IpcBytes`].
174    pub fn into_inner(self) -> IpcBytes {
175        self.bytes
176    }
177}
178impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
179    fn from(value: IpcBytesCast<T>) -> Self {
180        value.bytes
181    }
182}
183impl<T: bytemuck::AnyBitPattern> Clone for IpcBytesCast<T> {
184    fn clone(&self) -> Self {
185        Self {
186            bytes: self.bytes.clone(),
187            _t: PhantomData,
188        }
189    }
190}
191impl<T: bytemuck::AnyBitPattern> fmt::Debug for IpcBytesCast<T> {
192    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193        write!(f, "IpcBytesCast<{}>(<{} items>)", std::any::type_name::<T>(), self.len())
194    }
195}
196impl<T: bytemuck::AnyBitPattern> serde::Serialize for IpcBytesCast<T> {
197    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198    where
199        S: serde::Serializer,
200    {
201        self.bytes.serialize(serializer)
202    }
203}
204impl<'de, T: bytemuck::AnyBitPattern> serde::Deserialize<'de> for IpcBytesCast<T> {
205    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
206    where
207        D: serde::Deserializer<'de>,
208    {
209        let bytes = IpcBytes::deserialize(deserializer)?;
210        Ok(bytes.cast())
211    }
212}
213impl<T: bytemuck::AnyBitPattern> PartialEq for IpcBytesCast<T> {
214    fn eq(&self, other: &Self) -> bool {
215        self.bytes == other.bytes
216    }
217}
218impl<T: bytemuck::AnyBitPattern> Eq for IpcBytesCast<T> {}
219impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesCast<T> {
220    /// Copy or move data from vector.
221    pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
222        IpcBytes::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytes::cast)
223    }
224
225    /// Copy data from the iterator.
226    ///
227    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
228    /// will collect to an [`IpcBytesWriter`] that can reallocate multiple times as the buffer grows.
229    ///
230    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
231    /// is used, if it continues after the given maximum it is clipped.
232    ///
233    /// [`size_hint`]: Iterator::size_hint
234    /// [`IpcBytesWriter`]: crate::channel::IpcBytesWriter
235    pub async fn from_iter(iter: impl Iterator<Item = T>) -> io::Result<Self> {
236        #[cfg(ipc)]
237        {
238            let (min, max) = iter.size_hint();
239            let l = size_of::<T>();
240            let min = min * l;
241            let max = max.map(|m| m * l);
242            if let Some(max) = max {
243                if max <= IpcBytes::INLINE_MAX {
244                    return Self::from_vec(iter.collect()).await;
245                } else if max == min {
246                    let mut r = IpcBytesMut::new(max).await?;
247                    let mut actual_len = 0;
248                    for (i, f) in r.chunks_exact_mut(l).zip(iter) {
249                        i.copy_from_slice(bytemuck::bytes_of(&f));
250                        actual_len += 1;
251                    }
252                    r.truncate(actual_len * l);
253                    return r.finish().await.map(IpcBytes::cast);
254                }
255            }
256
257            let mut writer = IpcBytes::new_writer().await;
258            for f in iter {
259                use futures_lite::AsyncWriteExt as _;
260
261                writer.write_all(bytemuck::bytes_of(&f)).await?;
262            }
263            writer.finish().await.map(IpcBytes::cast)
264        }
265        #[cfg(not(ipc))]
266        {
267            Self::from_vec(iter.collect()).await
268        }
269    }
270
271    /// Copy or move data from vector.
272    pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
273        IpcBytes::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytes::cast)
274    }
275
276    /// Copy data from slice.
277    pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
278        IpcBytes::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytes::cast)
279    }
280
281    /// Copy data from the iterator.
282    ///
283    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
284    /// will collect to an [`IpcBytesWriterBlocking`] that can reallocate multiple times as the buffer grows.
285    ///
286    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
287    /// is used, if it continues after the given maximum it is clipped.
288    ///
289    /// [`size_hint`]: Iterator::size_hint
290    /// [`IpcBytesWriterBlocking`]: crate::channel::IpcBytesWriterBlocking
291    pub fn from_iter_blocking(mut iter: impl Iterator<Item = T>) -> io::Result<Self> {
292        #[cfg(ipc)]
293        {
294            let (min, max) = iter.size_hint();
295            let l = size_of::<T>();
296            let min = min * l;
297            let max = max.map(|m| m * l);
298            if let Some(max) = max {
299                if max <= IpcBytes::INLINE_MAX {
300                    return Self::from_vec_blocking(iter.collect());
301                } else if max == min {
302                    let mut r = IpcBytesMut::new_blocking(max)?;
303                    let mut actual_len = 0;
304                    for (i, f) in r.chunks_exact_mut(l).zip(&mut iter) {
305                        i.copy_from_slice(bytemuck::bytes_of(&f));
306                        actual_len += 1;
307                    }
308                    r.truncate(actual_len * l);
309                    return r.finish_blocking().map(IpcBytes::cast);
310                }
311            }
312
313            let mut writer = IpcBytes::new_writer_blocking();
314            for f in iter {
315                use std::io::Write as _;
316
317                writer.write_all(bytemuck::bytes_of(&f))?;
318            }
319            writer.finish().map(IpcBytes::cast)
320        }
321        #[cfg(not(ipc))]
322        {
323            Self::from_vec_blocking(iter.collect())
324        }
325    }
326
327    /// Reference the underlying raw bytes.
328    pub fn as_bytes(&self) -> &IpcBytes {
329        &self.bytes
330    }
331}
332
333impl IpcBytes {
334    /// Safe bytemuck casting wrapper.
335    ///
336    /// The wrapper will deref to `[T]` and can be converted back to `IpcBytes`.
337    ///
338    /// # Panics
339    ///
340    /// Panics if cannot cast, se [bytemuck docs] for details.
341    ///
342    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
343    pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
344        let r = IpcBytesCast {
345            bytes: self,
346            _t: PhantomData,
347        };
348        let _assert = &r[..];
349        r
350    }
351
352    /// Safe bytemuck cast to slice.
353    ///
354    /// # Panics
355    ///
356    /// Panics if cannot cast, se [bytemuck docs] for details.
357    ///
358    /// [bytemuck docs]: https://docs.rs/bytemuck/1.24.0/bytemuck/fn.try_cast_slice.html
359    pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
360        bytemuck::cast_slice(self)
361    }
362}
363
364/// An [`IpcBytesCast`] iterator that holds a strong reference to it.
365pub struct IpcBytesCastIntoIter<T: bytemuck::AnyBitPattern>(IpcBytesIntoIter, IpcBytesCast<T>);
366impl<T: bytemuck::AnyBitPattern> IpcBytesCastIntoIter<T> {
367    fn new(bytes: IpcBytesCast<T>) -> Self {
368        Self(bytes.bytes.clone().into_iter(), bytes)
369    }
370
371    /// The source bytes.
372    pub fn source(&self) -> &IpcBytesCast<T> {
373        &self.1
374    }
375
376    /// Items not yet iterated.
377    pub fn rest(&self) -> &[T] {
378        bytemuck::cast_slice(self.0.rest())
379    }
380}
381impl<T: bytemuck::AnyBitPattern> Iterator for IpcBytesCastIntoIter<T> {
382    type Item = T;
383
384    fn next(&mut self) -> Option<T> {
385        let size = size_of::<T>();
386        let r = *bytemuck::from_bytes(self.0.rest().get(..size)?);
387        self.0.nth(size - 1);
388        Some(r)
389    }
390
391    fn size_hint(&self) -> (usize, Option<usize>) {
392        let (mut min, mut max) = self.0.size_hint();
393        min /= size_of::<T>();
394        if let Some(max) = &mut max {
395            *max /= size_of::<T>();
396        }
397        (min, max)
398    }
399
400    fn nth(&mut self, n: usize) -> Option<T> {
401        let size = size_of::<T>();
402
403        let byte_skip = n.checked_mul(size)?;
404        let byte_end = byte_skip.checked_add(size)?;
405
406        let bytes = self.0.rest().get(byte_skip..byte_end)?;
407        let r = *bytemuck::from_bytes(bytes);
408
409        self.0.nth(byte_end - 1);
410
411        Some(r)
412    }
413
414    fn last(mut self) -> Option<Self::Item>
415    where
416        Self: Sized,
417    {
418        self.next_back()
419    }
420}
421impl<T: bytemuck::AnyBitPattern> DoubleEndedIterator for IpcBytesCastIntoIter<T> {
422    fn next_back(&mut self) -> Option<T> {
423        let size = size_of::<T>();
424
425        let len = self.0.rest().len();
426        if len < size {
427            return None;
428        }
429
430        let start = len - size;
431        let bytes = &self.0.rest()[start..];
432        let r = *bytemuck::from_bytes(bytes);
433
434        self.0.nth_back(size - 1);
435
436        Some(r)
437    }
438
439    fn nth_back(&mut self, n: usize) -> Option<T> {
440        let size = size_of::<T>();
441
442        let rev_byte_skip = n.checked_mul(size)?;
443        let rev_byte_end = rev_byte_skip.checked_add(size)?;
444        let len = self.0.rest().len();
445
446        if len < rev_byte_end {
447            return None;
448        }
449
450        let start = len - rev_byte_end;
451        let end = len - rev_byte_skip;
452
453        let bytes = &self.0.rest()[start..end];
454        let r = *bytemuck::from_bytes(bytes);
455
456        self.0.nth_back(rev_byte_end - 1);
457
458        Some(r)
459    }
460}
461impl<T: bytemuck::AnyBitPattern> FusedIterator for IpcBytesCastIntoIter<T> {}
462impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCastIntoIter<T> {
463    fn default() -> Self {
464        IpcBytes::empty().cast::<T>().into_iter()
465    }
466}
467impl<T: bytemuck::AnyBitPattern> IntoIterator for IpcBytesCast<T> {
468    type Item = T;
469
470    type IntoIter = IpcBytesCastIntoIter<T>;
471
472    fn into_iter(self) -> Self::IntoIter {
473        IpcBytesCastIntoIter::new(self)
474    }
475}