Skip to main content

zng_task/channel/
ipc_bytes.rs

1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4    cell::Cell,
5    fmt, fs,
6    io::{self, Read, Write},
7    iter::FusedIterator,
8    ops,
9    path::{Path, PathBuf},
10    pin::Pin,
11    sync::{Arc, Weak},
12};
13
14use futures_lite::{AsyncReadExt as _, AsyncWriteExt as _};
15#[cfg(ipc)]
16use ipc_channel::ipc::IpcSharedMemory;
17use serde::{Deserialize, Serialize};
18use zng_app_context::RunOnDrop;
19
20#[cfg(ipc)]
21use crate::channel::ipc_bytes_memmap::Memmap;
22
23/// Immutable bytes vector that can be can be shared fast over IPC.
24///
25/// # Memory Storage
26///
27/// All storage backends are held by an [`Arc`] pointer, so cloning in process is always very cheap.
28///
29/// The `from_*` constructor functions use different storage depending on byte length. Bytes <= 64KB are allocated in the heap
30/// and are copied when shared with another process. Bytes <= 128MB are allocated in shared memory, only the system handle
31/// is copied when shared with another process. Bytes > 128MB are allocated in a temporary file with restricted access and memory mapped
32/// for read, only the file handle and some metadata are copied when shared with another process.
33///
34/// Constructor functions for creating memory maps directly are also provided.
35///
36/// Note that in builds without the `"ipc"` crate feature only heap backend is available, in that case all data lengths are stored in the heap.
37///
38/// # Serialization
39///
40/// When serialized inside [`with_ipc_serialization`] only the system handles and metadata of shared memory and memory maps is serialized.
41/// When serializing outsize the IPC context the data is copied.
42///
43/// When deserializing memory map handles are reconnected and if deserializing bytes selects the best storage based on data length.
44///
45/// [`IpcSender`]: super::IpcSender
46#[derive(Clone)]
47#[repr(transparent)]
48pub struct IpcBytes(pub(super) Arc<IpcBytesData>);
49impl Serialize for IpcBytes {
50    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
51    where
52        S: serde::Serializer,
53    {
54        Serialize::serialize(&*self.0, serializer)
55    }
56}
57impl<'de> Deserialize<'de> for IpcBytes {
58    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
59    where
60        D: serde::Deserializer<'de>,
61    {
62        let b = <IpcBytesData as Deserialize>::deserialize(deserializer)?;
63        Ok(Self(Arc::new(b)))
64    }
65}
66#[derive(Serialize, Deserialize)]
67pub(super) enum IpcBytesData {
68    Heap(Vec<u8>),
69    #[cfg(ipc)]
70    AnonMemMap(IpcSharedMemory),
71    #[cfg(ipc)]
72    MemMap(Memmap),
73}
74impl fmt::Debug for IpcBytes {
75    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76        write!(f, "IpcBytes(<{} bytes>)", self.len())
77    }
78}
79impl ops::Deref for IpcBytes {
80    type Target = [u8];
81
82    fn deref(&self) -> &Self::Target {
83        match &*self.0 {
84            IpcBytesData::Heap(i) => i,
85            #[cfg(ipc)]
86            IpcBytesData::AnonMemMap(m) => m,
87            #[cfg(ipc)]
88            IpcBytesData::MemMap(f) => f,
89        }
90    }
91}
92
93impl IpcBytes {
94    /// New empty.
95    pub fn empty() -> Self {
96        IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
97    }
98}
99
100/// Constructors.
101///
102/// See also [`IpcBytesMut`](crate::channel::IpcBytesMut).
103impl IpcBytes {
104    /// Copy or move data from vector.
105    pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
106        blocking::unblock(move || Self::from_vec_blocking(data)).await
107    }
108
109    /// Copy or move data from vector.
110    pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
111        #[cfg(ipc)]
112        {
113            if data.len() <= Self::INLINE_MAX {
114                Ok(Self(Arc::new(IpcBytesData::Heap(data))))
115            } else {
116                Self::from_slice_blocking(&data)
117            }
118        }
119        #[cfg(not(ipc))]
120        {
121            Ok(Self(Arc::new(IpcBytesData::Heap(data))))
122        }
123    }
124
125    /// Copy data from slice.
126    pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
127        #[cfg(ipc)]
128        {
129            if data.len() <= Self::INLINE_MAX {
130                Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
131            } else if data.len() <= Self::UNNAMED_MAX {
132                Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
133            } else {
134                Self::new_memmap_blocking(|m| m.write_all(data))
135            }
136        }
137        #[cfg(not(ipc))]
138        {
139            Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
140        }
141    }
142
143    /// Copy data from the iterator.
144    ///
145    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
146    /// will collect to an [`IpcBytesWriter`] that can reallocate multiple times as the buffer grows.
147    ///    
148    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
149    /// is used, if it continues after the given maximum it is clipped.
150    ///
151    /// [`size_hint`]: Iterator::size_hint
152    /// [`IpcBytesWriter`]: crate::channel::IpcBytesWriter
153    pub async fn from_iter(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
154        #[cfg(ipc)]
155        {
156            let (min, max) = iter.size_hint();
157            if let Some(max) = max {
158                if max <= Self::INLINE_MAX {
159                    return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
160                } else if max == min {
161                    let mut r = super::IpcBytesMut::new(max).await?;
162                    let mut actual_len = 0;
163                    for (i, b) in r.iter_mut().zip(iter) {
164                        *i = b;
165                        actual_len += 1;
166                    }
167                    r.truncate(actual_len);
168                    return r.finish().await;
169                }
170            }
171
172            let mut writer = Self::new_writer().await;
173            for b in iter {
174                writer.write_all(&[b]).await?;
175            }
176            writer.finish().await
177        }
178
179        #[cfg(not(ipc))]
180        {
181            Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
182        }
183    }
184
185    /// Copy data from the iterator.
186    ///
187    /// This is most efficient if the [`size_hint`] indicates an exact length (min equals max), otherwise this
188    /// will collect to an [`IpcBytesWriterBlocking`] that can reallocate multiple times as the buffer grows.
189    ///
190    /// Note that if the iterator gives an exact length that is the maximum taken, if it ends early the smaller length
191    /// is used, if it continues after the given maximum it is clipped.
192    ///
193    /// [`size_hint`]: Iterator::size_hint
194    ///
195    /// [`IpcBytesWriterBlocking`]: crate::channel::IpcBytesWriterBlocking
196    pub fn from_iter_blocking(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
197        #[cfg(ipc)]
198        {
199            let (min, max) = iter.size_hint();
200            if let Some(max) = max {
201                if max <= Self::INLINE_MAX {
202                    return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
203                } else if max == min {
204                    let mut r = super::IpcBytesMut::new_blocking(max)?;
205                    let mut actual_len = 0;
206                    for (i, b) in r.iter_mut().zip(iter) {
207                        *i = b;
208                        actual_len += 1;
209                    }
210                    r.truncate(actual_len);
211                    return r.finish_blocking();
212                }
213            }
214
215            let mut writer = Self::new_writer_blocking();
216            for b in iter {
217                writer.write_all(&[b])?;
218            }
219            writer.finish()
220        }
221        #[cfg(not(ipc))]
222        {
223            Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
224        }
225    }
226
227    /// Read `data` into shared memory.
228    pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
229        #[cfg(ipc)]
230        {
231            Self::from_read_ipc(data).await
232        }
233        #[cfg(not(ipc))]
234        {
235            let mut data = data;
236            let mut buf = vec![];
237            data.read_to_end(&mut buf).await;
238            Self::from_vec(buf).await
239        }
240    }
241    #[cfg(ipc)]
242    async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
243        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
244        let mut len = 0;
245
246        // INLINE_MAX read
247        loop {
248            match data.read(&mut buf[len..]).await {
249                Ok(l) => {
250                    if l == 0 {
251                        // is <= INLINE_MAX
252                        buf.truncate(len);
253                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
254                    } else {
255                        len += l;
256                        if len == Self::INLINE_MAX + 1 {
257                            // goto UNNAMED_MAX read
258                            break;
259                        }
260                    }
261                }
262                Err(e) => match e.kind() {
263                    io::ErrorKind::WouldBlock => continue,
264                    _ => return Err(e),
265                },
266            }
267        }
268
269        // UNNAMED_MAX read
270        buf.resize(Self::UNNAMED_MAX + 1, 0);
271        loop {
272            match data.read(&mut buf[len..]).await {
273                Ok(l) => {
274                    if l == 0 {
275                        // is <= UNNAMED_MAX
276                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
277                    } else {
278                        len += l;
279                        if len == Self::UNNAMED_MAX + 1 {
280                            // goto named file loop
281                            break;
282                        }
283                    }
284                }
285                Err(e) => match e.kind() {
286                    io::ErrorKind::WouldBlock => continue,
287                    _ => return Err(e),
288                },
289            }
290        }
291
292        // named file copy
293        Self::new_memmap(async |m| {
294            use futures_lite::AsyncWriteExt as _;
295
296            m.write_all(&buf).await?;
297            crate::io::copy(data, m).await?;
298            Ok(())
299        })
300        .await
301    }
302
303    /// Read `data` into shared memory.
304    pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
305        #[cfg(ipc)]
306        {
307            Self::from_read_blocking_ipc(data)
308        }
309        #[cfg(not(ipc))]
310        {
311            let mut buf = vec![];
312            data.read_to_end(&mut buf)?;
313            Self::from_vec_blocking(buf)
314        }
315    }
316    #[cfg(ipc)]
317    fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
318        let mut buf = vec![0u8; Self::INLINE_MAX + 1];
319        let mut len = 0;
320
321        // INLINE_MAX read
322        loop {
323            match data.read(&mut buf[len..]) {
324                Ok(l) => {
325                    if l == 0 {
326                        // is <= INLINE_MAX
327                        buf.truncate(len);
328                        return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
329                    } else {
330                        len += l;
331                        if len == Self::INLINE_MAX + 1 {
332                            // goto UNNAMED_MAX read
333                            break;
334                        }
335                    }
336                }
337                Err(e) => match e.kind() {
338                    io::ErrorKind::WouldBlock => continue,
339                    _ => return Err(e),
340                },
341            }
342        }
343
344        // UNNAMED_MAX read
345        buf.resize(Self::UNNAMED_MAX + 1, 0);
346        loop {
347            match data.read(&mut buf[len..]) {
348                Ok(l) => {
349                    if l == 0 {
350                        // is <= UNNAMED_MAX
351                        return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
352                    } else {
353                        len += l;
354                        if len == Self::UNNAMED_MAX + 1 {
355                            // goto named file loop
356                            break;
357                        }
358                    }
359                }
360                Err(e) => match e.kind() {
361                    io::ErrorKind::WouldBlock => continue,
362                    _ => return Err(e),
363                },
364            }
365        }
366
367        // named file copy
368        Self::new_memmap_blocking(|m| {
369            m.write_all(&buf)?;
370            io::copy(data, m)?;
371            Ok(())
372        })
373    }
374
375    /// Read `path` into shared memory.
376    pub async fn from_path(path: PathBuf) -> io::Result<Self> {
377        let file = crate::fs::File::open(path).await?;
378        Self::from_file(file).await
379    }
380    /// Read `file` into shared memory.
381    pub async fn from_file(mut file: crate::fs::File) -> io::Result<Self> {
382        #[cfg(ipc)]
383        {
384            let len = file.metadata().await?.len();
385            if len <= Self::UNNAMED_MAX as u64 {
386                let mut buf = vec![0u8; len as usize];
387                file.read_exact(&mut buf).await?;
388                Self::from_vec_blocking(buf)
389            } else {
390                Self::new_memmap(async move |m| {
391                    crate::io::copy(&mut file, m).await?;
392                    Ok(())
393                })
394                .await
395            }
396        }
397        #[cfg(not(ipc))]
398        {
399            let mut buf = vec![];
400            file.read_to_end(&mut buf).await?;
401            Self::from_vec_blocking(buf)
402        }
403    }
404
405    /// Read `path` into shared memory.
406    pub fn from_path_blocking(path: &Path) -> io::Result<Self> {
407        let file = fs::File::open(path)?;
408        Self::from_file_blocking(file)
409    }
410    /// Read `file` into shared memory.
411    pub fn from_file_blocking(mut file: fs::File) -> io::Result<Self> {
412        #[cfg(ipc)]
413        {
414            let len = file.metadata()?.len();
415            if len <= Self::UNNAMED_MAX as u64 {
416                let mut buf = vec![0u8; len as usize];
417                file.read_exact(&mut buf)?;
418                Self::from_vec_blocking(buf)
419            } else {
420                Self::new_memmap_blocking(|m| {
421                    io::copy(&mut file, m)?;
422                    Ok(())
423                })
424            }
425        }
426        #[cfg(not(ipc))]
427        {
428            let mut buf = vec![];
429            file.read_to_end(&mut buf)?;
430            Self::from_vec_blocking(buf)
431        }
432    }
433
434    /// Create a memory mapped file.
435    ///
436    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
437    /// always selects the slowest options, a file backed memory map.
438    #[cfg(ipc)]
439    pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
440        use crate::channel::ipc_bytes_memmap::MemmapMut;
441
442        let file = blocking::unblock(MemmapMut::begin_write).await?;
443        let mut file = crate::fs::File::from(file);
444        write(&mut file).await?;
445
446        match file.try_unwrap().await {
447            Ok(f) => {
448                let map = blocking::unblock(move || Memmap::end_write(f)).await?;
449                Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
450            }
451            Err(_) => Err(io::Error::new(
452                io::ErrorKind::ResourceBusy,
453                "no all tasks started by `write` awaited before return",
454            )),
455        }
456    }
457
458    /// Memory map an existing file.
459    ///
460    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`]
461    // if the file does not have enough bytes. Returns [`io::ErrorKind::FileTooLarge`] if the range length or file length is
462    // greater than `usize::MAX`.
463    ///
464    /// # Safety
465    ///
466    /// Caller must ensure the `file` is not modified while all clones of the `IpcBytes` exists in the current process and others.
467    #[cfg(ipc)]
468    pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<u64>>) -> io::Result<Self> {
469        blocking::unblock(move || {
470            // SAFETY: up to the caller
471            unsafe { Self::open_memmap_blocking(file, range) }
472        })
473        .await
474    }
475
476    /// Create a memory mapped file.
477    ///
478    /// Note that the `from_` functions select optimized backing storage depending on data length, this function
479    /// always selects the slowest options, a file backed memory map.
480    #[cfg(ipc)]
481    pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
482        use crate::channel::ipc_bytes_memmap::MemmapMut;
483
484        let mut file = MemmapMut::begin_write()?;
485        write(&mut file)?;
486        let map = Memmap::end_write(file)?;
487
488        Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
489    }
490
491    /// Memory map an existing file.
492    ///
493    /// The `range` defines the slice of the `file` that will be mapped. Returns [`io::ErrorKind::UnexpectedEof`]
494    // if the file does not have enough bytes. Returns [`io::ErrorKind::FileTooLarge`] if the range length or file length is
495    // greater than `usize::MAX`.
496    ///
497    /// # Safety
498    ///
499    /// Caller must ensure the `file` is not modified or removed while all clones of the `IpcBytes` exists in the current process and others.
500    #[cfg(ipc)]
501    pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<u64>>) -> io::Result<Self> {
502        // SAFETY: up to the caller
503        let map = unsafe { Memmap::read_user_file(file, range) }?;
504
505        Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
506    }
507
508    /// Gets if both point to the same memory.
509    pub fn ptr_eq(&self, other: &Self) -> bool {
510        let a = &self[..];
511        let b = &other[..];
512        (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
513    }
514
515    #[cfg(ipc)]
516    pub(super) const INLINE_MAX: usize = 64 * 1024; // 64KB
517    #[cfg(ipc)]
518    pub(super) const UNNAMED_MAX: usize = 128 * 1024 * 1024; // 128MB
519}
520
521impl AsRef<[u8]> for IpcBytes {
522    fn as_ref(&self) -> &[u8] {
523        &self[..]
524    }
525}
526impl Default for IpcBytes {
527    fn default() -> Self {
528        Self::empty()
529    }
530}
531impl PartialEq for IpcBytes {
532    fn eq(&self, other: &Self) -> bool {
533        self.ptr_eq(other) || self[..] == other[..]
534    }
535}
536impl Eq for IpcBytes {}
537
538/// Enables special serialization of memory mapped files for the `serialize` call.
539///
540/// IPC channels like [`IpcSender`] serialize messages inside this context to support [`IpcBytes`] fast memory map sharing across processes.
541///
542/// You can use the [`is_ipc_serialization`] to check if inside context.
543///
544/// [`IpcSender`]: super::IpcSender
545#[cfg(ipc)]
546pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
547    let parent = IPC_SERIALIZATION.replace(true);
548    let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
549    serialize()
550}
551
552/// Checks if is inside [`with_ipc_serialization`].
553#[cfg(ipc)]
554pub fn is_ipc_serialization() -> bool {
555    IPC_SERIALIZATION.get()
556}
557
558#[cfg(ipc)]
559thread_local! {
560    static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
561}
562
563impl IpcBytes {
564    /// Create a weak in process reference.
565    ///
566    /// Note that the weak reference cannot upgrade if only another process holds a strong reference.
567    pub fn downgrade(&self) -> WeakIpcBytes {
568        WeakIpcBytes(Arc::downgrade(&self.0))
569    }
570}
571
572/// Weak reference to an in process [`IpcBytes`].
573pub struct WeakIpcBytes(Weak<IpcBytesData>);
574impl WeakIpcBytes {
575    /// Get strong reference if any exists in the process.
576    pub fn upgrade(&self) -> Option<IpcBytes> {
577        self.0.upgrade().map(IpcBytes)
578    }
579
580    /// Count of strong references in the process.
581    pub fn strong_count(&self) -> usize {
582        self.0.strong_count()
583    }
584}
585
586// Slice iterator is very efficient, but it hold a reference, so we hold a self reference.
587// The alternative to this is copying the unsafe code from std and adapting it or implementing
588// a much slower index based iterator.
589type SliceIter<'a> = std::slice::Iter<'a, u8>;
590self_cell::self_cell! {
591    struct IpcBytesIntoIterInner {
592        owner: IpcBytes,
593        #[covariant]
594        dependent: SliceIter,
595    }
596}
597
598/// An [`IpcBytes`] iterator that holds a strong reference to it.
599pub struct IpcBytesIntoIter(IpcBytesIntoIterInner);
600impl IpcBytesIntoIter {
601    fn new(bytes: IpcBytes) -> Self {
602        Self(IpcBytesIntoIterInner::new(bytes, |b| b.iter()))
603    }
604
605    /// The source bytes.
606    pub fn source(&self) -> &IpcBytes {
607        self.0.borrow_owner()
608    }
609
610    /// Bytes not yet iterated.
611    pub fn rest(&self) -> &[u8] {
612        self.0.borrow_dependent().as_slice()
613    }
614}
615impl Iterator for IpcBytesIntoIter {
616    type Item = u8;
617
618    fn next(&mut self) -> Option<u8> {
619        self.0.with_dependent_mut(|_, d| d.next().copied())
620    }
621
622    fn size_hint(&self) -> (usize, Option<usize>) {
623        self.0.borrow_dependent().size_hint()
624    }
625
626    fn count(self) -> usize
627    where
628        Self: Sized,
629    {
630        self.0.borrow_dependent().as_slice().len()
631    }
632
633    fn nth(&mut self, n: usize) -> Option<u8> {
634        self.0.with_dependent_mut(|_, d| d.nth(n).copied())
635    }
636
637    fn last(mut self) -> Option<Self::Item>
638    where
639        Self: Sized,
640    {
641        self.next_back()
642    }
643}
644impl DoubleEndedIterator for IpcBytesIntoIter {
645    fn next_back(&mut self) -> Option<Self::Item> {
646        self.0.with_dependent_mut(|_, d| d.next_back().copied())
647    }
648
649    fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
650        self.0.with_dependent_mut(|_, d| d.nth_back(n).copied())
651    }
652}
653impl FusedIterator for IpcBytesIntoIter {}
654impl Default for IpcBytesIntoIter {
655    fn default() -> Self {
656        IpcBytes::empty().into_iter()
657    }
658}
659impl IntoIterator for IpcBytes {
660    type Item = u8;
661
662    type IntoIter = IpcBytesIntoIter;
663
664    fn into_iter(self) -> Self::IntoIter {
665        IpcBytesIntoIter::new(self)
666    }
667}