Skip to main content

zng_task/channel/
ipc_bytes_mut.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4    fmt,
5    io::{self, Write as _},
6    mem::MaybeUninit,
7    ops,
8    path::PathBuf,
9    pin::Pin,
10    sync::Arc,
11};
12
13#[cfg(ipc)]
14use ipc_channel::ipc::IpcSharedMemory;
15
16use crate::channel::IpcBytes;
17use crate::channel::ipc_bytes::IpcBytesData;
18#[cfg(ipc)]
19use crate::channel::ipc_bytes_memmap::MemmapMut;
20
21enum IpcBytesMutInner {
22    Heap(Vec<u8>),
23    #[cfg(ipc)]
24    AnonMemMap(IpcSharedMemory),
25    #[cfg(ipc)]
26    MemMap(MemmapMut),
27}
28
29/// Represents preallocated exclusive mutable memory that can be converted to [`IpcBytes`].
30///
31/// Like [`IpcBytes`] three storage modes are supported, heap, shared memory and file backed memory map. Most
32/// efficient mode is selected automatically for the given length, unless the constructor function explicitly states otherwise.
33pub struct IpcBytesMut {
34    inner: IpcBytesMutInner,
35    len: usize,
36}
37impl ops::Deref for IpcBytesMut {
38    type Target = [u8];
39
40    fn deref(&self) -> &Self::Target {
41        let len = self.len;
42        match &self.inner {
43            IpcBytesMutInner::Heap(v) => &v[..len],
44            #[cfg(ipc)]
45            IpcBytesMutInner::AnonMemMap(m) => &m[..len],
46            #[cfg(ipc)]
47            IpcBytesMutInner::MemMap(m) => &m[..len],
48        }
49    }
50}
51impl ops::DerefMut for IpcBytesMut {
52    fn deref_mut(&mut self) -> &mut Self::Target {
53        let len = self.len;
54        match &mut self.inner {
55            IpcBytesMutInner::Heap(v) => &mut v[..len],
56            #[cfg(ipc)]
57            IpcBytesMutInner::AnonMemMap(m) => {
58                // SAFETY: we are the only reference to the map
59                unsafe { m.deref_mut() }
60            }
61            #[cfg(ipc)]
62            IpcBytesMutInner::MemMap(m) => &mut m[..len],
63        }
64    }
65}
66impl fmt::Debug for IpcBytesMut {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        write!(f, "IpcBytesMut(<{} bytes>)", self.len())
69    }
70}
71impl IpcBytesMut {
72    /// Allocate zeroed mutable memory.
73    pub async fn new(len: usize) -> io::Result<IpcBytesMut> {
74        #[cfg(ipc)]
75        if len <= IpcBytes::INLINE_MAX {
76            Ok(IpcBytesMut {
77                len,
78                inner: IpcBytesMutInner::Heap(vec![0; len]),
79            })
80        } else if len <= IpcBytes::UNNAMED_MAX {
81            Ok(IpcBytesMut {
82                len,
83                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
84            })
85        } else {
86            blocking::unblock(move || Self::new_blocking(len)).await
87        }
88
89        #[cfg(not(ipc))]
90        {
91            Ok(IpcBytesMut {
92                len,
93                inner: IpcBytesMutInner::Heap(vec![0; len]),
94            })
95        }
96    }
97
98    /// Allocate zeroed mutable memory.
99    pub fn new_blocking(len: usize) -> io::Result<IpcBytesMut> {
100        #[cfg(ipc)]
101        if len <= IpcBytes::INLINE_MAX {
102            Ok(IpcBytesMut {
103                len,
104                inner: IpcBytesMutInner::Heap(vec![0; len]),
105            })
106        } else if len <= IpcBytes::UNNAMED_MAX {
107            Ok(IpcBytesMut {
108                len,
109                inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
110            })
111        } else {
112            Ok(IpcBytesMut {
113                len,
114                inner: IpcBytesMutInner::MemMap(MemmapMut::new(len)?),
115            })
116        }
117        #[cfg(not(ipc))]
118        {
119            Ok(IpcBytesMut {
120                len,
121                inner: IpcBytesMutInner::Heap(vec![0; len]),
122            })
123        }
124    }
125
126    /// Allocate zeroed mutable memory in a memory map.
127    ///
128    /// Note that [`new`] automatically selects the best memory storage for the given `len`, this
129    /// function enforces the usage of a memory map, the slowest of the options.
130    ///
131    /// [`new`]: Self::new
132    #[cfg(ipc)]
133    pub async fn new_memmap(len: usize) -> io::Result<Self> {
134        blocking::unblock(move || Self::new_memmap_blocking(len)).await
135    }
136
137    /// Allocate zeroed mutable memory in a memory map.
138    ///
139    /// Note that [`new_blocking`] automatically selects the best memory storage for the given `len`, this
140    /// function enforces the usage of a memory map, the slowest of the options.
141    ///
142    /// [`new_blocking`]: Self::new_blocking
143    #[cfg(ipc)]
144    pub fn new_memmap_blocking(len: usize) -> io::Result<Self> {
145        Ok(Self {
146            len,
147            inner: IpcBytesMutInner::MemMap(MemmapMut::new(len)?),
148        })
149    }
150
151    /// Uses `buf` or copies it to exclusive mutable memory.
152    pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
153        #[cfg(ipc)]
154        if buf.len() <= IpcBytes::INLINE_MAX {
155            Ok(Self {
156                len: buf.len(),
157                inner: IpcBytesMutInner::Heap(buf),
158            })
159        } else {
160            blocking::unblock(move || {
161                let mut b = Self::new_blocking(buf.len())?;
162                b[..].copy_from_slice(&buf);
163                Ok(b)
164            })
165            .await
166        }
167        #[cfg(not(ipc))]
168        {
169            Ok(Self {
170                len: buf.len(),
171                inner: IpcBytesMutInner::Heap(buf),
172            })
173        }
174    }
175
176    /// Uses `buf` or copies it to exclusive mutable memory.
177    pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
178        #[cfg(ipc)]
179        if buf.len() <= IpcBytes::INLINE_MAX {
180            Ok(Self {
181                len: buf.len(),
182                inner: IpcBytesMutInner::Heap(buf),
183            })
184        } else {
185            let mut b = Self::new_blocking(buf.len())?;
186            b[..].copy_from_slice(&buf);
187            Ok(b)
188        }
189        #[cfg(not(ipc))]
190        {
191            Ok(Self {
192                len: buf.len(),
193                inner: IpcBytesMutInner::Heap(buf),
194            })
195        }
196    }
197
198    /// Copy `buf` to exclusive mutable memory.
199    pub fn from_slice_blocking(buf: &[u8]) -> io::Result<Self> {
200        #[cfg(ipc)]
201        if buf.len() <= IpcBytes::INLINE_MAX {
202            Ok(Self {
203                len: buf.len(),
204                inner: IpcBytesMutInner::Heap(buf.to_vec()),
205            })
206        } else {
207            let mut b = Self::new_blocking(buf.len())?;
208            b[..].copy_from_slice(buf);
209            Ok(b)
210        }
211        #[cfg(not(ipc))]
212        {
213            Ok(Self {
214                len: buf.len(),
215                inner: IpcBytesMutInner::Heap(buf.to_vec()),
216            })
217        }
218    }
219
220    /// Use or copy bytes to exclusive mutable memory.
221    pub async fn from_bytes(bytes: IpcBytes) -> io::Result<Self> {
222        blocking::unblock(move || Self::from_bytes_blocking(bytes)).await
223    }
224
225    /// Use or copy `bytes` to exclusive mutable memory.
226    pub fn from_bytes_blocking(bytes: IpcBytes) -> io::Result<Self> {
227        #[cfg_attr(not(ipc), allow(irrefutable_let_patterns))]
228        if let IpcBytesData::Heap(_) = &*bytes.0 {
229            match Arc::try_unwrap(bytes.0) {
230                Ok(r) => match r {
231                    IpcBytesData::Heap(r) => Ok(Self {
232                        len: r.len(),
233                        inner: IpcBytesMutInner::Heap(r),
234                    }),
235                    _ => unreachable!(),
236                },
237                Err(a) => Self::from_slice_blocking(&IpcBytes(a)[..]),
238            }
239        } else {
240            Self::from_slice_blocking(&bytes[..])
241        }
242    }
243
244    /// Memory map an existing file.
245    ///
246    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`]
247    // if the file does not have enough bytes. Returns [`io::ErrorKind::FileTooLarge`] if the range length or file length is
248    // greater than `usize::MAX`.
249    ///
250    /// # Safety
251    ///
252    /// Caller must ensure the `file` is not modified or removed while the `IpcBytesMut` instance lives, or any instance of
253    /// [`IpcBytes`] later created from this.
254    #[cfg(ipc)]
255    pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<u64>>) -> io::Result<Self> {
256        blocking::unblock(move || {
257            // SAFETY: up to the caller
258            unsafe { Self::open_memmap_blocking(file, range) }
259        })
260        .await
261    }
262
263    /// Memory map an existing file.
264    ///
265    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`]
266    // if the file does not have enough bytes. Returns [`io::ErrorKind::FileTooLarge`] if the range length or file length is
267    // greater than `usize::MAX`.
268    ///
269    /// # Safety
270    ///
271    /// Caller must ensure the `file` is not modified or removed while the `IpcBytesMut` instance lives, or any instance of
272    /// [`IpcBytes`] later created from this.
273    #[cfg(ipc)]
274    pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<u64>>) -> io::Result<Self> {
275        // SAFETY: up to the caller
276        let map = unsafe { MemmapMut::write_user_file(file, range) }?;
277
278        Ok(Self {
279            len: map.len(),
280            inner: IpcBytesMutInner::MemMap(map),
281        })
282    }
283
284    /// Create a new zeroed file and memory map it.
285    ///
286    /// # Safety
287    ///
288    /// Caller must ensure the `file` is not modified or removed while the `IpcBytesMut` instance lives, or any instance of
289    /// [`IpcBytes`] later created from this.
290    #[cfg(ipc)]
291    pub async unsafe fn create(file: PathBuf, len: usize) -> io::Result<Self> {
292        blocking::unblock(move || {
293            // SAFETY: up to the caller
294            unsafe { Self::create_blocking(file, len) }
295        })
296        .await
297    }
298
299    /// Create a new zeroed file and memory map it.
300    ///
301    /// # Safety
302    ///
303    /// Caller must ensure the `file` is not modified or removed while the `IpcBytesMut` instance lives, or any instance of
304    /// [`IpcBytes`] later created from this.
305    #[cfg(ipc)]
306    pub unsafe fn create_blocking(file: PathBuf, len: usize) -> io::Result<Self> {
307        // SAFETY: up to the caller
308        let map = unsafe { MemmapMut::create_user_file(file, len) }?;
309
310        Ok(Self {
311            len,
312            inner: IpcBytesMutInner::MemMap(map),
313        })
314    }
315}
316impl IpcBytesMut {
317    /// Convert to immutable shareable [`IpcBytes`].
318    pub async fn finish(mut self) -> io::Result<IpcBytes> {
319        let len = self.len;
320        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
321            IpcBytesMutInner::Heap(mut v) => {
322                v.truncate(len);
323                v.shrink_to_fit();
324                IpcBytesData::Heap(v)
325            }
326            #[cfg(ipc)]
327            IpcBytesMutInner::AnonMemMap(m) => {
328                if len < IpcBytes::INLINE_MAX {
329                    IpcBytesData::Heap(m[..len].to_vec())
330                } else if len < m.len() {
331                    IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
332                } else {
333                    IpcBytesData::AnonMemMap(m)
334                }
335            }
336            #[cfg(ipc)]
337            IpcBytesMutInner::MemMap(m) => {
338                let m = m.into_read_only()?;
339                IpcBytesData::MemMap(m)
340            }
341        };
342        Ok(IpcBytes(Arc::new(data)))
343    }
344
345    /// Convert to immutable shareable [`IpcBytes`].
346    pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
347        let len = self.len;
348        let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
349            IpcBytesMutInner::Heap(mut v) => {
350                v.truncate(len);
351                IpcBytesData::Heap(v)
352            }
353            #[cfg(ipc)]
354            IpcBytesMutInner::AnonMemMap(m) => {
355                if len < IpcBytes::INLINE_MAX {
356                    IpcBytesData::Heap(m[..len].to_vec())
357                } else if len < m.len() {
358                    IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
359                } else {
360                    IpcBytesData::AnonMemMap(m)
361                }
362            }
363            #[cfg(ipc)]
364            IpcBytesMutInner::MemMap(m) => {
365                let m = m.into_read_only()?;
366                IpcBytesData::MemMap(m)
367            }
368        };
369        Ok(IpcBytes(Arc::new(data)))
370    }
371}
372impl IpcBytesMut {
373    /// Shorten the bytes length.
374    ///
375    /// If `new_len` is greater or equal to current length does nothing.
376    ///
377    /// Note that this does not affect memory allocation, the extra bytes are only dropped on finish.
378    pub fn truncate(&mut self, new_len: usize) {
379        self.len = self.len.min(new_len);
380    }
381
382    /// Convert chunks of length `L0` to chunks of length `L1` where `L1 <= L0`.
383    ///
384    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
385    ///
386    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
387    ///
388    /// # Panics
389    ///
390    /// Panics if `L1 > L0` or if bytes length is not multiple of `L0`.
391    pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
392        assert!(L1 <= L0);
393
394        let self_ = &mut self[..];
395
396        let len = self_.len();
397        if len == 0 {
398            return;
399        }
400        assert!(len.is_multiple_of(L0), "length must be multiple of L0");
401
402        let ptr = self_.as_mut_ptr();
403        let mut write = 0usize;
404        let mut read = 0usize;
405
406        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
407        unsafe {
408            while read < len {
409                let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
410                std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
411                read += L0;
412
413                let out_chunk = reduce(in_chunk.assume_init());
414
415                std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
416                write += L1;
417            }
418        }
419
420        self.truncate(write);
421    }
422
423    /// Convert chunks of `in_chunk_len` to chunks of `out_chunk_buf.len()` where `out_chunk_buf.len() <= in_chunk_len`.
424    ///
425    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
426    ///
427    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
428    ///
429    /// # Panics
430    ///
431    /// Panics if `out_chunk_buf.len() > in_chunk_len` or if bytes length is not multiple of `in_chunk_len`.
432    pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
433        assert!(out_chunk_buf.len() < in_chunk_len);
434
435        let self_ = &mut self[..];
436
437        let len = self_.len();
438        if len == 0 {
439            return;
440        }
441        assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
442
443        let ptr = self_.as_mut_ptr();
444        let mut write = 0usize;
445        let mut read = 0usize;
446
447        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
448        unsafe {
449            while read < len {
450                reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
451                read += in_chunk_len;
452
453                std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
454                write += out_chunk_buf.len();
455            }
456        }
457
458        self.truncate(write);
459    }
460
461    /// Convert chunks of length `L0` to chunks of length `L1` where `size_of::<T1>() * L1 <= size_of::<T0>() * L0`.
462    ///
463    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
464    ///
465    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
466    ///
467    /// # Panics
468    ///
469    /// Panics if `size_of::<T1>() * L1 > size_of::<T0>() * L0` or if bytes length is not multiple of `size_of::<T0>() * L0`.
470    pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
471    where
472        T0: bytemuck::AnyBitPattern,
473    {
474        let l0 = std::mem::size_of::<T0>() * L0;
475        let l1 = std::mem::size_of::<T1>() * L1;
476        assert!(l1 <= l0);
477
478        let self_ = &mut self[..];
479
480        let len = self_.len();
481        if len == 0 {
482            return;
483        }
484        assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
485
486        let ptr = self_.as_mut_ptr();
487        let mut write = 0usize;
488        let mut read = 0usize;
489
490        // SAFETY:
491        // Pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
492        // Reading [T0; L0] from raw bytes is safe because T0: AnyBitPattern
493        unsafe {
494            while read < len {
495                let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
496                std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
497                read += l0;
498
499                let out_chunk = reduce(in_chunk.assume_init());
500
501                std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
502                write += l1;
503            }
504        }
505
506        self.truncate(write);
507    }
508
509    /// Convert chunks of `size_of::<T0>() * in_chunk_len` to chunks of `size_of::<T1>() * out_chunk_buf.len()`
510    /// where `size_of::<T1>() * out_chunk_buf.len() <= size_of::<T0>() * in_chunk_len`.
511    ///
512    /// Reuses the same allocation for the new data, replacing in place. Truncates the final buffer to the new length.
513    ///
514    /// Note that no memory is released by this call as truncated is lazy and applied on finish.
515    ///
516    /// # Panics
517    ///
518    /// Panics if `size_of::<T1>() * out_chunk_buf.len() > size_of::<T0>() * in_chunk_len` or if bytes
519    /// length is not multiple of `size_of::<T0>() * in_chunk_len`.
520    pub fn cast_reduce_in_place_dyn<T0, T1>(
521        &mut self,
522        in_chunk_len: usize,
523        out_chunk_buf: &mut [T1],
524        mut reduce: impl FnMut(&[T0], &mut [T1]),
525    ) where
526        T0: bytemuck::AnyBitPattern,
527    {
528        let l0 = std::mem::size_of::<T0>() * in_chunk_len;
529        let l1 = std::mem::size_of_val(out_chunk_buf);
530
531        assert!(l1 <= l0);
532
533        let self_ = &mut self[..];
534
535        let len = self_.len();
536        if len == 0 {
537            return;
538        }
539        assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
540
541        let ptr = self_.as_mut_ptr();
542        let mut write = 0usize;
543        let mut read = 0usize;
544
545        // SAFETY: pointers stay inside slice, in_chunk and out_chunk copy never overlaps with slice.
546        unsafe {
547            while read < len {
548                reduce(
549                    bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
550                    &mut *out_chunk_buf,
551                );
552                read += l0;
553
554                std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
555                write += l1;
556            }
557        }
558
559        self.truncate(write);
560    }
561
562    /// Reverses the order of chunks in the slice, in place.
563    ///
564    /// Chunk length is const L.
565    ///
566    /// # Panics
567    ///
568    /// Panics if length is not multiple of `L`.
569    pub fn reverse_chunks<const L: usize>(&mut self) {
570        let self_ = &mut self[..];
571
572        let len = self_.len();
573
574        if len == 0 || L == 0 {
575            return;
576        }
577
578        if L == 1 {
579            return self_.reverse();
580        }
581
582        assert!(len.is_multiple_of(L), "length must be multiple of L");
583
584        // SAFETY: already verified is multiple and already handled L=0
585        unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
586    }
587
588    /// Reverses the order of chunks in the slice, in place.
589    ///
590    /// # Panics
591    ///
592    /// Panics if length is not multiple of `chunk_len`.
593    pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
594        let self_ = &mut self[..];
595
596        let len = self_.len();
597
598        if len == 0 || chunk_len == 0 {
599            return;
600        }
601
602        if chunk_len == 1 {
603            return self_.reverse();
604        }
605
606        assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
607
608        let mut a = 0;
609        let mut b = len - chunk_len;
610
611        let ptr = self_.as_mut_ptr();
612
613        // SAFETY: chunks are not overlapping and loop stops before at mid, chunk_len > 1
614        unsafe {
615            while a < b {
616                std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
617                a += chunk_len;
618                b -= chunk_len;
619            }
620        }
621    }
622}
623
624/// Represents an async [`IpcBytes`] writer.
625///
626/// Use [`IpcBytes::new_writer`] to start writing.
627pub struct IpcBytesWriter {
628    inner: blocking::Unblock<IpcBytesWriterBlocking>,
629}
630impl IpcBytesWriter {
631    /// Finish writing and move data to a shareable [`IpcBytes`].
632    pub async fn finish(self) -> std::io::Result<IpcBytes> {
633        let inner = self.inner.into_inner().await;
634        blocking::unblock(move || inner.finish()).await
635    }
636
637    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
638    pub async fn finish_mut(self) -> std::io::Result<super::IpcBytesMut> {
639        let inner = self.inner.into_inner().await;
640        blocking::unblock(move || inner.finish_mut()).await
641    }
642}
643impl crate::io::AsyncWrite for IpcBytesWriter {
644    fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
645        crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
646    }
647
648    fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
649        crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
650    }
651
652    fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
653        crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
654    }
655}
656impl crate::io::AsyncSeek for IpcBytesWriter {
657    fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
658        crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
659    }
660}
661
662/// Represents a blocking [`IpcBytes`] writer.
663///
664/// Use [`IpcBytes::new_writer_blocking`] to start writing.
665pub struct IpcBytesWriterBlocking {
666    #[cfg(ipc)]
667    heap_buf: Vec<u8>,
668    #[cfg(ipc)]
669    memmap: Option<std::fs::File>,
670
671    #[cfg(not(ipc))]
672    heap_buf: std::io::Cursor<Vec<u8>>,
673}
674impl IpcBytesWriterBlocking {
675    /// Finish writing and move data to a shareable [`IpcBytes`].
676    pub fn finish(self) -> std::io::Result<IpcBytes> {
677        let m = self.finish_mut()?;
678        m.finish_blocking()
679    }
680
681    /// Mode data to an exclusive mutable [`IpcBytes`] that can be further modified, but not resized.
682    pub fn finish_mut(mut self) -> std::io::Result<super::IpcBytesMut> {
683        self.flush()?;
684        #[cfg(ipc)]
685        {
686            let (len, inner) = match self.memmap {
687                Some(file) => {
688                    let map = MemmapMut::end_write(file)?;
689                    let len = map.len();
690                    (len, IpcBytesMutInner::MemMap(map))
691                }
692                None => {
693                    let len = self.heap_buf.len();
694                    let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
695                        IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
696                    } else {
697                        IpcBytesMutInner::Heap(self.heap_buf)
698                    };
699                    (len, i)
700                }
701            };
702            Ok(IpcBytesMut { len, inner })
703        }
704        #[cfg(not(ipc))]
705        {
706            let heap_buf = self.heap_buf.into_inner();
707            let len = heap_buf.len();
708            let inner = IpcBytesMutInner::Heap(heap_buf);
709            Ok(IpcBytesMut { len, inner })
710        }
711    }
712
713    #[cfg(ipc)]
714    fn alloc_memmap_file(&mut self) -> io::Result<()> {
715        if self.memmap.is_none() {
716            self.memmap = Some(MemmapMut::begin_write()?);
717        }
718        let file = &mut self.memmap.as_mut().unwrap();
719
720        file.write_all(&self.heap_buf)?;
721        // already allocated UNNAMED_MAX, continue using it as a large buffer
722        self.heap_buf.clear();
723        Ok(())
724    }
725}
726impl std::io::Write for IpcBytesWriterBlocking {
727    fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
728        #[cfg(ipc)]
729        {
730            if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
731                // write exceed heap buffer, convert to memmap or flush to existing memmap
732                self.alloc_memmap_file()?;
733
734                if write_buf.len() > IpcBytes::UNNAMED_MAX {
735                    // writing massive payload, skip buffer
736                    self.memmap.as_mut().unwrap().write_all(write_buf)?;
737                } else {
738                    self.heap_buf.extend_from_slice(write_buf);
739                }
740            } else {
741                if self.memmap.is_none() {
742                    // heap buffer not fully allocated yet, ensure we only allocate up to UNNAMED_MAX
743                    self.heap_buf
744                        .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
745                }
746                self.heap_buf.extend_from_slice(write_buf);
747            }
748
749            Ok(write_buf.len())
750        }
751
752        #[cfg(not(ipc))]
753        {
754            std::io::Write::write(&mut self.heap_buf, write_buf)
755        }
756    }
757
758    fn flush(&mut self) -> io::Result<()> {
759        #[cfg(ipc)]
760        if let Some(file) = &mut self.memmap {
761            if !self.heap_buf.is_empty() {
762                file.write_all(&self.heap_buf)?;
763                self.heap_buf.clear();
764            }
765            file.flush()?;
766        }
767        Ok(())
768    }
769}
770impl std::io::Seek for IpcBytesWriterBlocking {
771    fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
772        #[cfg(ipc)]
773        {
774            self.alloc_memmap_file()?;
775            let file = self.memmap.as_mut().unwrap();
776            if !self.heap_buf.is_empty() {
777                file.write_all(&self.heap_buf)?;
778                self.heap_buf.clear();
779            }
780            file.seek(pos)
781        }
782        #[cfg(not(ipc))]
783        {
784            std::io::Seek::seek(&mut self.heap_buf, pos)
785        }
786    }
787}
788
789impl IpcBytes {
790    /// Start a memory efficient async writer for creating a `IpcBytes` with unknown length.
791    pub async fn new_writer() -> IpcBytesWriter {
792        IpcBytesWriter {
793            inner: blocking::Unblock::new(Self::new_writer_blocking()),
794        }
795    }
796
797    /// Start a memory efficient blocking writer for creating a `IpcBytes` with unknown length.
798    pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
799        IpcBytesWriterBlocking {
800            #[cfg(ipc)]
801            heap_buf: vec![],
802            #[cfg(ipc)]
803            memmap: None,
804
805            #[cfg(not(ipc))]
806            heap_buf: std::io::Cursor::new(vec![]),
807        }
808    }
809}