1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4 cell::Cell,
5 fmt, fs,
6 io::{self, Read, Write},
7 iter::FusedIterator,
8 marker::PhantomData,
9 mem::MaybeUninit,
10 ops,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{Arc, Weak},
14};
15
16use futures_lite::{AsyncReadExt, AsyncWriteExt as _};
17#[cfg(ipc)]
18use ipc_channel::ipc::IpcSharedMemory;
19use parking_lot::Mutex;
20use serde::{Deserialize, Serialize, de::VariantAccess};
21use zng_app_context::RunOnDrop;
22
23#[derive(Clone)]
47#[repr(transparent)]
48pub struct IpcBytes(Arc<IpcBytesData>);
49enum IpcBytesData {
50 Heap(Vec<u8>),
51 #[cfg(ipc)]
52 AnonMemMap(IpcSharedMemory),
53 #[cfg(ipc)]
54 MemMap(IpcMemMap),
55}
56#[cfg(ipc)]
57struct IpcMemMap {
58 name: PathBuf,
59 range: ops::Range<usize>,
60 is_custom: bool,
61 map: Option<memmap2::Mmap>,
62 read_handle: Option<fs::File>,
63}
64impl fmt::Debug for IpcBytes {
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 write!(f, "IpcBytes(<{} bytes>)", self.len())
67 }
68}
69impl ops::Deref for IpcBytes {
70 type Target = [u8];
71
72 fn deref(&self) -> &Self::Target {
73 match &*self.0 {
74 IpcBytesData::Heap(i) => i,
75 #[cfg(ipc)]
76 IpcBytesData::AnonMemMap(m) => m,
77 #[cfg(ipc)]
78 IpcBytesData::MemMap(f) => f.map.as_ref().unwrap(),
79 }
80 }
81}
82
83impl IpcBytes {
84 pub fn empty() -> Self {
86 IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
87 }
88}
89impl IpcBytes {
91 pub async fn new_writer() -> IpcBytesWriter {
93 IpcBytesWriter {
94 inner: blocking::Unblock::new(Self::new_writer_blocking()),
95 }
96 }
97
98 pub async fn new_mut(len: usize) -> io::Result<IpcBytesMut> {
100 IpcBytesMut::new(len).await
101 }
102
103 pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
105 blocking::unblock(move || Self::from_vec_blocking(data)).await
106 }
107
108 pub async fn from_iter(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
118 #[cfg(ipc)]
119 {
120 let (min, max) = iter.size_hint();
121 if let Some(max) = max {
122 if max <= Self::INLINE_MAX {
123 return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
124 } else if max == min {
125 let mut r = IpcBytes::new_mut(max).await?;
126 let mut actual_len = 0;
127 for (i, b) in r.iter_mut().zip(iter) {
128 *i = b;
129 actual_len += 1;
130 }
131 r.truncate(actual_len);
132 return r.finish().await;
133 }
134 }
135
136 let mut writer = Self::new_writer().await;
137 for b in iter {
138 writer.write_all(&[b]).await?;
139 }
140 writer.finish().await
141 }
142
143 #[cfg(not(ipc))]
144 {
145 Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
146 }
147 }
148
149 pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
151 #[cfg(ipc)]
152 {
153 Self::from_read_ipc(data).await
154 }
155 #[cfg(not(ipc))]
156 {
157 let mut data = data;
158 let mut buf = vec![];
159 data.read_to_end(&mut buf).await;
160 Self::from_vec(buf).await
161 }
162 }
163 #[cfg(ipc)]
164 async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
165 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
166 let mut len = 0;
167
168 loop {
170 match data.read(&mut buf[len..]).await {
171 Ok(l) => {
172 if l == 0 {
173 buf.truncate(len);
175 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
176 } else {
177 len += l;
178 if len == Self::INLINE_MAX + 1 {
179 break;
181 }
182 }
183 }
184 Err(e) => match e.kind() {
185 io::ErrorKind::WouldBlock => continue,
186 _ => return Err(e),
187 },
188 }
189 }
190
191 buf.resize(Self::UNNAMED_MAX + 1, 0);
193 loop {
194 match data.read(&mut buf[len..]).await {
195 Ok(l) => {
196 if l == 0 {
197 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
199 } else {
200 len += l;
201 if len == Self::UNNAMED_MAX + 1 {
202 break;
204 }
205 }
206 }
207 Err(e) => match e.kind() {
208 io::ErrorKind::WouldBlock => continue,
209 _ => return Err(e),
210 },
211 }
212 }
213
214 Self::new_memmap(async |m| {
216 use futures_lite::AsyncWriteExt as _;
217
218 m.write_all(&buf).await?;
219 crate::io::copy(data, m).await?;
220 Ok(())
221 })
222 .await
223 }
224
225 pub async fn from_path(path: PathBuf) -> io::Result<Self> {
227 let file = crate::fs::File::open(path).await?;
228 Self::from_file(file).await
229 }
230 pub async fn from_file(mut file: crate::fs::File) -> io::Result<Self> {
232 #[cfg(ipc)]
233 {
234 let len = file.metadata().await?.len();
235 if len <= Self::UNNAMED_MAX as u64 {
236 let mut buf = vec![0u8; len as usize];
237 file.read_exact(&mut buf).await?;
238 Self::from_vec_blocking(buf)
239 } else {
240 Self::new_memmap(async move |m| {
241 crate::io::copy(&mut file, m).await?;
242 Ok(())
243 })
244 .await
245 }
246 }
247 #[cfg(not(ipc))]
248 {
249 let mut buf = vec![];
250 file.read_to_end(&mut buf).await?;
251 Self::from_vec_blocking(buf)
252 }
253 }
254
255 #[cfg(ipc)]
260 pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
261 let (name, file) = blocking::unblock(Self::create_memmap).await?;
262 let mut file = crate::fs::File::from(file);
263 write(&mut file).await?;
264
265 let mut permissions = file.metadata().await?.permissions();
266 permissions.set_readonly(true);
267 #[cfg(unix)]
268 {
269 use std::os::unix::fs::PermissionsExt;
270 permissions.set_mode(0o400);
271 }
272 file.set_permissions(permissions).await?;
273
274 blocking::unblock(move || {
275 drop(file);
276 let map = IpcMemMap::read(name, None)?;
277 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
278 })
279 .await
280 }
281
282 #[cfg(ipc)]
295 pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
296 blocking::unblock(move || {
297 unsafe { Self::open_memmap_blocking(file, range) }
299 })
300 .await
301 }
302
303 pub fn ptr_eq(&self, other: &Self) -> bool {
305 let a = &self[..];
306 let b = &other[..];
307 (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
308 }
309
310 #[cfg(ipc)]
311 const INLINE_MAX: usize = 64 * 1024; #[cfg(ipc)]
313 const UNNAMED_MAX: usize = 128 * 1024 * 1024; }
315
316impl IpcBytes {
318 pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
320 IpcBytesWriterBlocking {
321 #[cfg(ipc)]
322 heap_buf: vec![],
323 #[cfg(ipc)]
324 memmap: None,
325
326 #[cfg(not(ipc))]
327 heap_buf: std::io::Cursor::new(vec![]),
328 }
329 }
330
331 pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMut> {
333 IpcBytesMut::new_blocking(len)
334 }
335
336 pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
338 #[cfg(ipc)]
339 {
340 if data.len() <= Self::INLINE_MAX {
341 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
342 } else if data.len() <= Self::UNNAMED_MAX {
343 Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
344 } else {
345 Self::new_memmap_blocking(|m| m.write_all(data))
346 }
347 }
348 #[cfg(not(ipc))]
349 {
350 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
351 }
352 }
353
354 pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
356 #[cfg(ipc)]
357 {
358 if data.len() <= Self::INLINE_MAX {
359 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
360 } else {
361 Self::from_slice_blocking(&data)
362 }
363 }
364 #[cfg(not(ipc))]
365 {
366 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
367 }
368 }
369
370 pub fn from_iter_blocking(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
380 #[cfg(ipc)]
381 {
382 let (min, max) = iter.size_hint();
383 if let Some(max) = max {
384 if max <= Self::INLINE_MAX {
385 return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
386 } else if max == min {
387 let mut r = IpcBytes::new_mut_blocking(max)?;
388 let mut actual_len = 0;
389 for (i, b) in r.iter_mut().zip(iter) {
390 *i = b;
391 actual_len += 1;
392 }
393 r.truncate(actual_len);
394 return r.finish_blocking();
395 }
396 }
397
398 let mut writer = Self::new_writer_blocking();
399 for b in iter {
400 writer.write_all(&[b])?;
401 }
402 writer.finish()
403 }
404 #[cfg(not(ipc))]
405 {
406 Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
407 }
408 }
409
410 pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
412 #[cfg(ipc)]
413 {
414 Self::from_read_blocking_ipc(data)
415 }
416 #[cfg(not(ipc))]
417 {
418 let mut buf = vec![];
419 data.read_to_end(&mut buf)?;
420 Self::from_vec_blocking(buf)
421 }
422 }
423 #[cfg(ipc)]
424 fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
425 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
426 let mut len = 0;
427
428 loop {
430 match data.read(&mut buf[len..]) {
431 Ok(l) => {
432 if l == 0 {
433 buf.truncate(len);
435 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
436 } else {
437 len += l;
438 if len == Self::INLINE_MAX + 1 {
439 break;
441 }
442 }
443 }
444 Err(e) => match e.kind() {
445 io::ErrorKind::WouldBlock => continue,
446 _ => return Err(e),
447 },
448 }
449 }
450
451 buf.resize(Self::UNNAMED_MAX + 1, 0);
453 loop {
454 match data.read(&mut buf[len..]) {
455 Ok(l) => {
456 if l == 0 {
457 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
459 } else {
460 len += l;
461 if len == Self::UNNAMED_MAX + 1 {
462 break;
464 }
465 }
466 }
467 Err(e) => match e.kind() {
468 io::ErrorKind::WouldBlock => continue,
469 _ => return Err(e),
470 },
471 }
472 }
473
474 Self::new_memmap_blocking(|m| {
476 m.write_all(&buf)?;
477 io::copy(data, m)?;
478 Ok(())
479 })
480 }
481
482 pub fn from_path_blocking(path: &Path) -> io::Result<Self> {
484 let file = fs::File::open(path)?;
485 Self::from_file_blocking(file)
486 }
487 pub fn from_file_blocking(mut file: fs::File) -> io::Result<Self> {
489 #[cfg(ipc)]
490 {
491 let len = file.metadata()?.len();
492 if len <= Self::UNNAMED_MAX as u64 {
493 let mut buf = vec![0u8; len as usize];
494 file.read_exact(&mut buf)?;
495 Self::from_vec_blocking(buf)
496 } else {
497 Self::new_memmap_blocking(|m| {
498 io::copy(&mut file, m)?;
499 Ok(())
500 })
501 }
502 }
503 #[cfg(not(ipc))]
504 {
505 let mut buf = vec![];
506 file.read_to_end(&mut buf)?;
507 Self::from_vec_blocking(buf)
508 }
509 }
510
511 #[cfg(ipc)]
516 pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
517 let (name, mut file) = Self::create_memmap()?;
518 write(&mut file)?;
519 let mut permissions = file.metadata()?.permissions();
520 permissions.set_readonly(true);
521 #[cfg(unix)]
522 {
523 use std::os::unix::fs::PermissionsExt;
524 permissions.set_mode(0o400);
525 }
526 file.set_permissions(permissions)?;
527
528 drop(file);
529 let map = IpcMemMap::read(name, None)?;
530 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
531 }
532 #[cfg(ipc)]
533 fn create_memmap() -> io::Result<(PathBuf, fs::File)> {
534 static MEMMAP_DIR: Mutex<usize> = Mutex::new(0);
535 let mut count = MEMMAP_DIR.lock();
536
537 if *count == 0 {
538 zng_env::on_process_exit(|_| {
539 IpcBytes::cleanup_memmap_storage();
540 });
541 }
542
543 let dir = zng_env::cache("zng-task-ipc-mem").join(std::process::id().to_string());
544 fs::create_dir_all(&dir)?;
545 let mut name = dir.join(count.to_string());
546 if *count < usize::MAX {
547 *count += 1;
548 } else {
549 for i in 0..usize::MAX {
551 name = dir.join(i.to_string());
552 if !name.exists() {
553 break;
554 }
555 }
556 if name.exists() {
557 return Err(io::Error::new(io::ErrorKind::StorageFull, ""));
558 }
559 };
560
561 let file = fs::OpenOptions::new()
563 .create(true)
564 .read(true)
565 .write(true)
566 .truncate(true)
567 .open(&name)?;
568 Ok((name, file))
569 }
570 #[cfg(ipc)]
571 fn cleanup_memmap_storage() {
572 if let Ok(dir) = fs::read_dir(zng_env::cache("zng-task-ipc-mem")) {
573 let entries: Vec<_> = dir.flatten().map(|e| e.path()).collect();
574 for entry in entries {
575 if entry.is_dir() {
576 fs::remove_dir_all(entry).ok();
577 }
578 }
579 }
580 }
581
582 #[cfg(ipc)]
595 pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
596 let read_handle = fs::File::open(&file)?;
597 read_handle.lock_shared()?;
598 let len = read_handle.metadata()?.len();
599 if let Some(range) = &range
600 && len < range.end as u64
601 {
602 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "file length < range.end"));
603 }
604 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
606
607 let range = range.unwrap_or_else(|| 0..map.len());
608
609 Ok(Self(Arc::new(IpcBytesData::MemMap(IpcMemMap {
610 name: file,
611 range,
612 read_handle: Some(read_handle),
613 is_custom: true,
614 map: Some(map),
615 }))))
616 }
617}
618
619impl AsRef<[u8]> for IpcBytes {
620 fn as_ref(&self) -> &[u8] {
621 &self[..]
622 }
623}
624impl Default for IpcBytes {
625 fn default() -> Self {
626 Self::empty()
627 }
628}
629impl PartialEq for IpcBytes {
630 fn eq(&self, other: &Self) -> bool {
631 self.ptr_eq(other) || self[..] == other[..]
632 }
633}
634impl Eq for IpcBytes {}
635#[cfg(ipc)]
636impl IpcMemMap {
637 fn read(name: PathBuf, range: Option<ops::Range<usize>>) -> io::Result<Self> {
638 let read_handle = fs::File::open(&name)?;
639 read_handle.lock_shared()?;
640 let map = unsafe { memmap2::Mmap::map(&read_handle) }?;
642
643 let range = range.unwrap_or_else(|| 0..map.len());
644
645 Ok(IpcMemMap {
646 name,
647 range,
648 is_custom: false,
649 read_handle: Some(read_handle),
650 map: Some(map),
651 })
652 }
653}
654#[cfg(ipc)]
655impl Serialize for IpcMemMap {
656 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
657 where
658 S: serde::Serializer,
659 {
660 (&self.name, self.range.clone()).serialize(serializer)
661 }
662}
663#[cfg(ipc)]
664impl<'de> Deserialize<'de> for IpcMemMap {
665 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
666 where
667 D: serde::Deserializer<'de>,
668 {
669 let (name, range) = <(PathBuf, ops::Range<usize>)>::deserialize(deserializer)?;
670 IpcMemMap::read(name, Some(range)).map_err(|e| serde::de::Error::custom(format!("cannot load ipc memory map file, {e}")))
671 }
672}
673#[cfg(ipc)]
674impl Drop for IpcMemMap {
675 fn drop(&mut self) {
676 self.map.take();
677 self.read_handle.take();
678 if !self.is_custom {
679 std::fs::remove_file(&self.name).ok();
680 }
681 }
682}
683
684impl Serialize for IpcBytes {
685 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
686 where
687 S: serde::Serializer,
688 {
689 #[cfg(ipc)]
690 {
691 if is_ipc_serialization() {
692 match &*self.0 {
693 IpcBytesData::Heap(b) => serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&b[..])),
694 IpcBytesData::AnonMemMap(b) => serializer.serialize_newtype_variant("IpcBytes", 1, "AnonMemMap", b),
695 IpcBytesData::MemMap(b) => {
696 let (sender, mut recv) = crate::channel::ipc_unbounded::<()>()
699 .map_err(|e| serde::ser::Error::custom(format!("cannot serialize memmap bytes for ipc, {e}")))?;
700
701 let r = serializer.serialize_newtype_variant("IpcBytes", 2, "MemMap", &(b, sender))?;
702 let hold = self.clone();
703 crate::spawn_wait(move || {
704 if let Err(e) = recv.recv_blocking() {
705 tracing::error!("IpcBytes memmap completion signal not received, {e}")
706 }
707 drop(hold);
708 });
709 Ok(r)
710 }
711 }
712 } else {
713 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
714 }
715 }
716 #[cfg(not(ipc))]
717 {
718 serializer.serialize_newtype_variant("IpcBytes", 0, "Heap", serde_bytes::Bytes::new(&self[..]))
719 }
720 }
721}
722impl<'de> Deserialize<'de> for IpcBytes {
723 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
724 where
725 D: serde::Deserializer<'de>,
726 {
727 #[derive(Deserialize)]
728 enum VariantId {
729 Heap,
730 #[cfg(ipc)]
731 AnonMemMap,
732 #[cfg(ipc)]
733 MemMap,
734 }
735
736 struct EnumVisitor;
737 impl<'de> serde::de::Visitor<'de> for EnumVisitor {
738 type Value = IpcBytes;
739
740 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
741 write!(f, "IpcBytes variant")
742 }
743
744 fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
745 where
746 A: serde::de::EnumAccess<'de>,
747 {
748 let (variant, access) = data.variant::<VariantId>()?;
749 match variant {
750 VariantId::Heap => access.newtype_variant_seed(ByteSliceVisitor),
751 #[cfg(ipc)]
752 VariantId::AnonMemMap => Ok(IpcBytes(Arc::new(IpcBytesData::AnonMemMap(access.newtype_variant()?)))),
753 #[cfg(ipc)]
754 VariantId::MemMap => {
755 let (memmap, mut completion_sender): (IpcMemMap, crate::channel::IpcSender<()>) = access.newtype_variant()?;
756 completion_sender.send_blocking(()).map_err(|e| {
757 serde::de::Error::custom(format!("cannot deserialize memmap bytes, completion signal failed, {e}"))
758 })?;
759 Ok(IpcBytes(Arc::new(IpcBytesData::MemMap(memmap))))
760 }
761 }
762 }
763 }
764 struct ByteSliceVisitor;
765 impl<'de> serde::de::Visitor<'de> for ByteSliceVisitor {
766 type Value = IpcBytes;
767
768 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
769 write!(f, "byte buffer")
770 }
771
772 fn visit_borrowed_bytes<E>(self, v: &'de [u8]) -> Result<Self::Value, E>
773 where
774 E: serde::de::Error,
775 {
776 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
777 }
778
779 fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
780 where
781 E: serde::de::Error,
782 {
783 IpcBytes::from_slice_blocking(v).map_err(serde::de::Error::custom)
784 }
785
786 fn visit_byte_buf<E>(self, v: Vec<u8>) -> Result<Self::Value, E>
787 where
788 E: serde::de::Error,
789 {
790 IpcBytes::from_vec_blocking(v).map_err(serde::de::Error::custom)
791 }
792 }
793 impl<'de> serde::de::DeserializeSeed<'de> for ByteSliceVisitor {
794 type Value = IpcBytes;
795
796 fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
797 where
798 D: serde::Deserializer<'de>,
799 {
800 deserializer.deserialize_bytes(ByteSliceVisitor)
801 }
802 }
803
804 #[cfg(ipc)]
805 {
806 deserializer.deserialize_enum("IpcBytes", &["Heap", "AnonMemMap", "MemMap"], EnumVisitor)
807 }
808 #[cfg(not(ipc))]
809 {
810 deserializer.deserialize_enum("IpcBytes", &["Heap"], EnumVisitor)
811 }
812 }
813}
814
815#[cfg(ipc)]
823pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
824 let parent = IPC_SERIALIZATION.replace(true);
825 let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
826 serialize()
827}
828
829#[cfg(ipc)]
831pub fn is_ipc_serialization() -> bool {
832 IPC_SERIALIZATION.get()
833}
834
835#[cfg(ipc)]
836thread_local! {
837 static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
838}
839
840impl IpcBytes {
841 pub fn downgrade(&self) -> WeakIpcBytes {
845 WeakIpcBytes(Arc::downgrade(&self.0))
846 }
847}
848
849pub struct WeakIpcBytes(Weak<IpcBytesData>);
851impl WeakIpcBytes {
852 pub fn upgrade(&self) -> Option<IpcBytes> {
854 self.0.upgrade().map(IpcBytes)
855 }
856
857 pub fn strong_count(&self) -> usize {
859 self.0.strong_count()
860 }
861}
862
863pub struct IpcBytesWriter {
867 inner: blocking::Unblock<IpcBytesWriterBlocking>,
868}
869impl IpcBytesWriter {
870 pub async fn finish(self) -> std::io::Result<IpcBytes> {
872 let inner = self.inner.into_inner().await;
873 blocking::unblock(move || inner.finish()).await
874 }
875
876 pub async fn finish_mut(self) -> std::io::Result<IpcBytesMut> {
878 let inner = self.inner.into_inner().await;
879 blocking::unblock(move || inner.finish_mut()).await
880 }
881}
882impl crate::io::AsyncWrite for IpcBytesWriter {
883 fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
884 crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
885 }
886
887 fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
888 crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
889 }
890
891 fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
892 crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
893 }
894}
895impl crate::io::AsyncSeek for IpcBytesWriter {
896 fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
897 crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
898 }
899}
900
901pub struct IpcBytesWriterBlocking {
905 #[cfg(ipc)]
906 heap_buf: Vec<u8>,
907 #[cfg(ipc)]
908 memmap: Option<(PathBuf, std::fs::File)>,
909
910 #[cfg(not(ipc))]
911 heap_buf: std::io::Cursor<Vec<u8>>,
912}
913impl IpcBytesWriterBlocking {
914 pub fn finish(self) -> std::io::Result<IpcBytes> {
916 let m = self.finish_mut()?;
917 m.finish_blocking()
918 }
919
920 pub fn finish_mut(mut self) -> std::io::Result<IpcBytesMut> {
922 self.flush()?;
923 #[cfg(ipc)]
924 {
925 let (len, inner) = match self.memmap {
926 Some((name, write_handle)) => {
927 let map = unsafe { memmap2::MmapMut::map_mut(&write_handle) }?;
929 let len = map.len();
930 (len, IpcBytesMutInner::MemMap { name, map, write_handle })
931 }
932 None => {
933 let len = self.heap_buf.len();
934 let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
935 IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
936 } else {
937 IpcBytesMutInner::Heap(self.heap_buf)
938 };
939 (len, i)
940 }
941 };
942 Ok(IpcBytesMut { len, inner })
943 }
944 #[cfg(not(ipc))]
945 {
946 let heap_buf = self.heap_buf.into_inner();
947 let len = heap_buf.len();
948 let inner = IpcBytesMutInner::Heap(heap_buf);
949 Ok(IpcBytesMut { len, inner })
950 }
951 }
952
953 #[cfg(ipc)]
954 fn alloc_memmap_file(&mut self) -> io::Result<()> {
955 if self.memmap.is_none() {
956 let (name, file) = IpcBytes::create_memmap()?;
957 file.lock()?;
958 #[cfg(unix)]
959 {
960 let mut permissions = file.metadata()?.permissions();
961 use std::os::unix::fs::PermissionsExt;
962 permissions.set_mode(0o600);
963 file.set_permissions(permissions)?;
964 }
965 self.memmap = Some((name, file));
966 }
967 let file = &mut self.memmap.as_mut().unwrap().1;
968
969 file.write_all(&self.heap_buf)?;
970 self.heap_buf.clear();
972 Ok(())
973 }
974}
975impl std::io::Write for IpcBytesWriterBlocking {
976 fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
977 #[cfg(ipc)]
978 {
979 if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
980 self.alloc_memmap_file()?;
982
983 if write_buf.len() > IpcBytes::UNNAMED_MAX {
984 self.memmap.as_mut().unwrap().1.write_all(write_buf)?;
986 } else {
987 self.heap_buf.extend_from_slice(write_buf);
988 }
989 } else {
990 if self.memmap.is_none() {
991 self.heap_buf
993 .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
994 }
995 self.heap_buf.extend_from_slice(write_buf);
996 }
997
998 Ok(write_buf.len())
999 }
1000
1001 #[cfg(not(ipc))]
1002 {
1003 std::io::Write::write(&mut self.heap_buf, write_buf)
1004 }
1005 }
1006
1007 fn flush(&mut self) -> io::Result<()> {
1008 #[cfg(ipc)]
1009 if let Some((_, file)) = &mut self.memmap {
1010 if !self.heap_buf.is_empty() {
1011 file.write_all(&self.heap_buf)?;
1012 self.heap_buf.clear();
1013 }
1014 file.flush()?;
1015 }
1016 Ok(())
1017 }
1018}
1019impl std::io::Seek for IpcBytesWriterBlocking {
1020 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
1021 #[cfg(ipc)]
1022 {
1023 self.alloc_memmap_file()?;
1024 let (_, file) = self.memmap.as_mut().unwrap();
1025 if !self.heap_buf.is_empty() {
1026 file.write_all(&self.heap_buf)?;
1027 self.heap_buf.clear();
1028 }
1029 file.seek(pos)
1030 }
1031 #[cfg(not(ipc))]
1032 {
1033 std::io::Seek::seek(&mut self.heap_buf, pos)
1034 }
1035 }
1036}
1037
1038enum IpcBytesMutInner {
1039 Heap(Vec<u8>),
1040 #[cfg(ipc)]
1041 AnonMemMap(IpcSharedMemory),
1042 #[cfg(ipc)]
1043 MemMap {
1044 name: PathBuf,
1045 map: memmap2::MmapMut,
1046 write_handle: std::fs::File,
1047 },
1048}
1049
1050pub struct IpcBytesMut {
1054 inner: IpcBytesMutInner,
1055 len: usize,
1056}
1057impl ops::Deref for IpcBytesMut {
1058 type Target = [u8];
1059
1060 fn deref(&self) -> &Self::Target {
1061 let len = self.len;
1062 match &self.inner {
1063 IpcBytesMutInner::Heap(v) => &v[..len],
1064 #[cfg(ipc)]
1065 IpcBytesMutInner::AnonMemMap(m) => &m[..len],
1066 #[cfg(ipc)]
1067 IpcBytesMutInner::MemMap { map, .. } => &map[..len],
1068 }
1069 }
1070}
1071impl ops::DerefMut for IpcBytesMut {
1072 fn deref_mut(&mut self) -> &mut Self::Target {
1073 let len = self.len;
1074 match &mut self.inner {
1075 IpcBytesMutInner::Heap(v) => &mut v[..len],
1076 #[cfg(ipc)]
1077 IpcBytesMutInner::AnonMemMap(m) => {
1078 unsafe { m.deref_mut() }
1080 }
1081 #[cfg(ipc)]
1082 IpcBytesMutInner::MemMap { map, .. } => &mut map[..len],
1083 }
1084 }
1085}
1086impl fmt::Debug for IpcBytesMut {
1087 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1088 write!(f, "IpcBytesMut(<{} bytes>)", self.len())
1089 }
1090}
1091impl IpcBytesMut {
1092 pub async fn new(len: usize) -> io::Result<IpcBytesMut> {
1094 #[cfg(ipc)]
1095 if len <= IpcBytes::INLINE_MAX {
1096 Ok(IpcBytesMut {
1097 len,
1098 inner: IpcBytesMutInner::Heap(vec![0; len]),
1099 })
1100 } else if len <= IpcBytes::UNNAMED_MAX {
1101 Ok(IpcBytesMut {
1102 len,
1103 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1104 })
1105 } else {
1106 blocking::unblock(move || Self::new_blocking(len)).await
1107 }
1108
1109 #[cfg(not(ipc))]
1110 {
1111 Ok(IpcBytesMut {
1112 len,
1113 inner: IpcBytesMutInner::Heap(vec![0; len]),
1114 })
1115 }
1116 }
1117
1118 pub fn new_blocking(len: usize) -> io::Result<IpcBytesMut> {
1120 #[cfg(ipc)]
1121 if len <= IpcBytes::INLINE_MAX {
1122 Ok(IpcBytesMut {
1123 len,
1124 inner: IpcBytesMutInner::Heap(vec![0; len]),
1125 })
1126 } else if len <= IpcBytes::UNNAMED_MAX {
1127 Ok(IpcBytesMut {
1128 len,
1129 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
1130 })
1131 } else {
1132 let (name, file) = IpcBytes::create_memmap()?;
1133 file.lock()?;
1134 #[cfg(unix)]
1135 {
1136 let mut permissions = file.metadata()?.permissions();
1137 use std::os::unix::fs::PermissionsExt;
1138 permissions.set_mode(0o600);
1139 file.set_permissions(permissions)?;
1140 }
1141 file.set_len(len as u64)?;
1142 let map = unsafe { memmap2::MmapMut::map_mut(&file) }?;
1144 Ok(IpcBytesMut {
1145 len,
1146 inner: IpcBytesMutInner::MemMap {
1147 name,
1148 map,
1149 write_handle: file,
1150 },
1151 })
1152 }
1153 #[cfg(not(ipc))]
1154 {
1155 Ok(IpcBytesMut {
1156 len,
1157 inner: IpcBytesMutInner::Heap(vec![0; len]),
1158 })
1159 }
1160 }
1161
1162 pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
1164 #[cfg(ipc)]
1165 if buf.len() <= IpcBytes::INLINE_MAX {
1166 Ok(Self {
1167 len: buf.len(),
1168 inner: IpcBytesMutInner::Heap(buf),
1169 })
1170 } else {
1171 blocking::unblock(move || {
1172 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1173 b[..].copy_from_slice(&buf);
1174 Ok(b)
1175 })
1176 .await
1177 }
1178 #[cfg(not(ipc))]
1179 {
1180 Ok(Self {
1181 len: buf.len(),
1182 inner: IpcBytesMutInner::Heap(buf),
1183 })
1184 }
1185 }
1186
1187 pub async fn from_bytes(bytes: IpcBytes) -> io::Result<Self> {
1189 blocking::unblock(move || Self::from_bytes_blocking(bytes)).await
1190 }
1191
1192 pub async fn finish(mut self) -> io::Result<IpcBytes> {
1194 let len = self.len;
1195 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1196 IpcBytesMutInner::Heap(mut v) => {
1197 v.truncate(len);
1198 v.shrink_to_fit();
1199 IpcBytesData::Heap(v)
1200 }
1201 #[cfg(ipc)]
1202 IpcBytesMutInner::AnonMemMap(m) => {
1203 if len < IpcBytes::INLINE_MAX {
1204 IpcBytesData::Heap(m[..len].to_vec())
1205 } else if len < m.len() {
1206 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1207 } else {
1208 IpcBytesData::AnonMemMap(m)
1209 }
1210 }
1211 #[cfg(ipc)]
1212 IpcBytesMutInner::MemMap { name, map, write_handle } => {
1213 let len = self.len;
1214 blocking::unblock(move || Self::finish_memmap(name, map, write_handle, len)).await?
1215 }
1216 };
1217 Ok(IpcBytes(Arc::new(data)))
1218 }
1219
1220 #[cfg(ipc)]
1221 fn finish_memmap(name: PathBuf, map: memmap2::MmapMut, write_handle: fs::File, len: usize) -> Result<IpcBytesData, io::Error> {
1222 let alloc_len = map.len();
1223 if alloc_len != len {
1224 write_handle.set_len(len as u64)?;
1225 }
1226 write_handle.unlock()?;
1227 let map = if alloc_len != len {
1228 drop(map);
1229 unsafe { memmap2::Mmap::map(&write_handle) }?
1231 } else {
1232 map.make_read_only()?
1233 };
1234 let mut permissions = write_handle.metadata()?.permissions();
1235 permissions.set_readonly(true);
1236 #[cfg(unix)]
1237 {
1238 use std::os::unix::fs::PermissionsExt;
1239 permissions.set_mode(0o400);
1240 }
1241 write_handle.set_permissions(permissions)?;
1242 drop(write_handle);
1243 let read_handle = std::fs::File::open(&name)?;
1244 read_handle.lock_shared()?;
1245 Ok(IpcBytesData::MemMap(IpcMemMap {
1246 name,
1247 range: 0..len,
1248 is_custom: false,
1249 map: Some(map),
1250 read_handle: Some(read_handle),
1251 }))
1252 }
1253}
1254impl IpcBytesMut {
1255 pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
1257 #[cfg(ipc)]
1258 if buf.len() <= IpcBytes::INLINE_MAX {
1259 Ok(Self {
1260 len: buf.len(),
1261 inner: IpcBytesMutInner::Heap(buf),
1262 })
1263 } else {
1264 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1265 b[..].copy_from_slice(&buf);
1266 Ok(b)
1267 }
1268 #[cfg(not(ipc))]
1269 {
1270 Ok(Self {
1271 len: buf.len(),
1272 inner: IpcBytesMutInner::Heap(buf),
1273 })
1274 }
1275 }
1276
1277 pub fn from_slice_blocking(buf: &[u8]) -> io::Result<Self> {
1279 #[cfg(ipc)]
1280 if buf.len() <= IpcBytes::INLINE_MAX {
1281 Ok(Self {
1282 len: buf.len(),
1283 inner: IpcBytesMutInner::Heap(buf.to_vec()),
1284 })
1285 } else {
1286 let mut b = IpcBytes::new_mut_blocking(buf.len())?;
1287 b[..].copy_from_slice(buf);
1288 Ok(b)
1289 }
1290 #[cfg(not(ipc))]
1291 {
1292 Ok(Self {
1293 len: buf.len(),
1294 inner: IpcBytesMutInner::Heap(buf.to_vec()),
1295 })
1296 }
1297 }
1298
1299 pub fn from_bytes_blocking(bytes: IpcBytes) -> io::Result<Self> {
1301 #[cfg_attr(not(ipc), allow(irrefutable_let_patterns))]
1302 if let IpcBytesData::Heap(_) = &*bytes.0 {
1303 match Arc::try_unwrap(bytes.0) {
1304 Ok(r) => match r {
1305 IpcBytesData::Heap(r) => Ok(Self {
1306 len: r.len(),
1307 inner: IpcBytesMutInner::Heap(r),
1308 }),
1309 _ => unreachable!(),
1310 },
1311 Err(a) => Self::from_slice_blocking(&IpcBytes(a)[..]),
1312 }
1313 } else {
1314 Self::from_slice_blocking(&bytes[..])
1315 }
1316 }
1317
1318 pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
1320 let len = self.len;
1321 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1322 IpcBytesMutInner::Heap(mut v) => {
1323 v.truncate(len);
1324 IpcBytesData::Heap(v)
1325 }
1326 #[cfg(ipc)]
1327 IpcBytesMutInner::AnonMemMap(m) => {
1328 if len < IpcBytes::INLINE_MAX {
1329 IpcBytesData::Heap(m[..len].to_vec())
1330 } else if len < m.len() {
1331 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
1332 } else {
1333 IpcBytesData::AnonMemMap(m)
1334 }
1335 }
1336 #[cfg(ipc)]
1337 IpcBytesMutInner::MemMap { name, map, write_handle } => Self::finish_memmap(name, map, write_handle, len)?,
1338 };
1339 Ok(IpcBytes(Arc::new(data)))
1340 }
1341}
1342#[cfg(ipc)]
1343impl Drop for IpcBytesMut {
1344 fn drop(&mut self) {
1345 if let IpcBytesMutInner::MemMap { name, map, write_handle } = std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
1346 drop(map);
1347 drop(write_handle);
1348 std::fs::remove_file(name).ok();
1349 }
1350 }
1351}
1352
1353pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
1357 bytes: IpcBytesMut,
1358 _t: PhantomData<T>,
1359}
1360impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
1361 type Target = [T];
1362
1363 fn deref(&self) -> &Self::Target {
1364 bytemuck::cast_slice::<u8, T>(&self.bytes)
1365 }
1366}
1367impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
1368 fn deref_mut(&mut self) -> &mut Self::Target {
1369 bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
1370 }
1371}
1372impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
1373 pub fn into_inner(self) -> IpcBytesMut {
1375 self.bytes
1376 }
1377}
1378impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
1379 fn from(value: IpcBytesMutCast<T>) -> Self {
1380 value.bytes
1381 }
1382}
1383impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
1384 pub async fn new(len: usize) -> io::Result<Self> {
1386 IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1387 }
1388
1389 pub fn new_blocking(len: usize) -> io::Result<Self> {
1391 IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1392 }
1393
1394 pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1396 IpcBytesMut::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytesMut::cast)
1397 }
1398
1399 pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1401 IpcBytesMut::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytesMut::cast)
1402 }
1403
1404 pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
1406 IpcBytesMut::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytesMut::cast)
1407 }
1408
1409 pub fn as_bytes(&mut self) -> &mut IpcBytesMut {
1411 &mut self.bytes
1412 }
1413
1414 pub async fn finish(self) -> io::Result<IpcBytesCast<T>> {
1416 self.bytes.finish().await.map(IpcBytes::cast)
1417 }
1418
1419 pub fn finish_blocking(self) -> io::Result<IpcBytesCast<T>> {
1421 self.bytes.finish_blocking().map(IpcBytes::cast)
1422 }
1423}
1424
1425impl IpcBytesMut {
1426 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
1436 let r = IpcBytesMutCast {
1437 bytes: self,
1438 _t: PhantomData,
1439 };
1440 let _assert = &r[..];
1441 r
1442 }
1443
1444 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1452 bytemuck::cast_slice(self)
1453 }
1454
1455 pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
1463 bytemuck::cast_slice_mut(self)
1464 }
1465}
1466
1467pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
1471 bytes: IpcBytes,
1472 _t: PhantomData<T>,
1473}
1474impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCast<T> {
1475 fn default() -> Self {
1476 Self {
1477 bytes: Default::default(),
1478 _t: PhantomData,
1479 }
1480 }
1481}
1482impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
1483 type Target = [T];
1484
1485 fn deref(&self) -> &Self::Target {
1486 bytemuck::cast_slice::<u8, T>(&self.bytes)
1487 }
1488}
1489impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
1490 pub fn into_inner(self) -> IpcBytes {
1492 self.bytes
1493 }
1494}
1495impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
1496 fn from(value: IpcBytesCast<T>) -> Self {
1497 value.bytes
1498 }
1499}
1500impl<T: bytemuck::AnyBitPattern> Clone for IpcBytesCast<T> {
1501 fn clone(&self) -> Self {
1502 Self {
1503 bytes: self.bytes.clone(),
1504 _t: PhantomData,
1505 }
1506 }
1507}
1508impl<T: bytemuck::AnyBitPattern> fmt::Debug for IpcBytesCast<T> {
1509 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1510 write!(f, "IpcBytesCast<{}>(<{} items>)", std::any::type_name::<T>(), self.len())
1511 }
1512}
1513impl<T: bytemuck::AnyBitPattern> serde::Serialize for IpcBytesCast<T> {
1514 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1515 where
1516 S: serde::Serializer,
1517 {
1518 self.bytes.serialize(serializer)
1519 }
1520}
1521impl<'de, T: bytemuck::AnyBitPattern> serde::Deserialize<'de> for IpcBytesCast<T> {
1522 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1523 where
1524 D: serde::Deserializer<'de>,
1525 {
1526 let bytes = IpcBytes::deserialize(deserializer)?;
1527 Ok(bytes.cast())
1528 }
1529}
1530impl<T: bytemuck::AnyBitPattern> PartialEq for IpcBytesCast<T> {
1531 fn eq(&self, other: &Self) -> bool {
1532 self.bytes == other.bytes
1533 }
1534}
1535impl<T: bytemuck::AnyBitPattern> Eq for IpcBytesCast<T> {}
1536impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesCast<T> {
1537 pub async fn new_mut(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1539 IpcBytesMut::new(len * size_of::<T>()).await.map(IpcBytesMut::cast)
1540 }
1541
1542 pub fn new_mut_blocking(len: usize) -> io::Result<IpcBytesMutCast<T>> {
1544 IpcBytesMut::new_blocking(len * size_of::<T>()).map(IpcBytesMut::cast)
1545 }
1546
1547 pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
1549 IpcBytes::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytes::cast)
1550 }
1551
1552 pub async fn from_iter(iter: impl Iterator<Item = T>) -> io::Result<Self> {
1562 #[cfg(ipc)]
1563 {
1564 let (min, max) = iter.size_hint();
1565 let l = size_of::<T>();
1566 let min = min * l;
1567 let max = max.map(|m| m * l);
1568 if let Some(max) = max {
1569 if max <= IpcBytes::INLINE_MAX {
1570 return Self::from_vec(iter.collect()).await;
1571 } else if max == min {
1572 let mut r = IpcBytes::new_mut(max).await?;
1573 let mut actual_len = 0;
1574 for (i, f) in r.chunks_exact_mut(l).zip(iter) {
1575 i.copy_from_slice(bytemuck::bytes_of(&f));
1576 actual_len += 1;
1577 }
1578 r.truncate(actual_len * l);
1579 return r.finish().await.map(IpcBytes::cast);
1580 }
1581 }
1582
1583 let mut writer = IpcBytes::new_writer().await;
1584 for f in iter {
1585 writer.write_all(bytemuck::bytes_of(&f)).await?;
1586 }
1587 writer.finish().await.map(IpcBytes::cast)
1588 }
1589 #[cfg(not(ipc))]
1590 {
1591 Self::from_vec(iter.collect()).await
1592 }
1593 }
1594
1595 pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
1597 IpcBytes::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytes::cast)
1598 }
1599
1600 pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
1602 IpcBytes::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytes::cast)
1603 }
1604
1605 pub fn from_iter_blocking(mut iter: impl Iterator<Item = T>) -> io::Result<Self> {
1615 #[cfg(ipc)]
1616 {
1617 let (min, max) = iter.size_hint();
1618 let l = size_of::<T>();
1619 let min = min * l;
1620 let max = max.map(|m| m * l);
1621 if let Some(max) = max {
1622 if max <= IpcBytes::INLINE_MAX {
1623 return Self::from_vec_blocking(iter.collect());
1624 } else if max == min {
1625 let mut r = IpcBytes::new_mut_blocking(max)?;
1626 let mut actual_len = 0;
1627 for (i, f) in r.chunks_exact_mut(l).zip(&mut iter) {
1628 i.copy_from_slice(bytemuck::bytes_of(&f));
1629 actual_len += 1;
1630 }
1631 r.truncate(actual_len * l);
1632 return r.finish_blocking().map(IpcBytes::cast);
1633 }
1634 }
1635
1636 let mut writer = IpcBytes::new_writer_blocking();
1637 for f in iter {
1638 writer.write_all(bytemuck::bytes_of(&f))?;
1639 }
1640 writer.finish().map(IpcBytes::cast)
1641 }
1642 #[cfg(not(ipc))]
1643 {
1644 Self::from_vec_blocking(iter.collect())
1645 }
1646 }
1647
1648 pub fn as_bytes(&self) -> &IpcBytes {
1650 &self.bytes
1651 }
1652}
1653
1654impl IpcBytes {
1655 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
1665 let r = IpcBytesCast {
1666 bytes: self,
1667 _t: PhantomData,
1668 };
1669 let _assert = &r[..];
1670 r
1671 }
1672
1673 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
1681 bytemuck::cast_slice(self)
1682 }
1683}
1684
1685impl IpcBytesMut {
1686 pub fn truncate(&mut self, new_len: usize) {
1692 self.len = self.len.min(new_len);
1693 }
1694
1695 pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
1705 assert!(L1 <= L0);
1706
1707 let self_ = &mut self[..];
1708
1709 let len = self_.len();
1710 if len == 0 {
1711 return;
1712 }
1713 assert!(len.is_multiple_of(L0), "length must be multiple of L0");
1714
1715 let ptr = self_.as_mut_ptr();
1716 let mut write = 0usize;
1717 let mut read = 0usize;
1718
1719 unsafe {
1721 while read < len {
1722 let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
1723 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
1724 read += L0;
1725
1726 let out_chunk = reduce(in_chunk.assume_init());
1727
1728 std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
1729 write += L1;
1730 }
1731 }
1732
1733 self.truncate(write);
1734 }
1735
1736 pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
1746 assert!(out_chunk_buf.len() < in_chunk_len);
1747
1748 let self_ = &mut self[..];
1749
1750 let len = self_.len();
1751 if len == 0 {
1752 return;
1753 }
1754 assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
1755
1756 let ptr = self_.as_mut_ptr();
1757 let mut write = 0usize;
1758 let mut read = 0usize;
1759
1760 unsafe {
1762 while read < len {
1763 reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
1764 read += in_chunk_len;
1765
1766 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
1767 write += out_chunk_buf.len();
1768 }
1769 }
1770
1771 self.truncate(write);
1772 }
1773
1774 pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
1784 where
1785 T0: bytemuck::AnyBitPattern,
1786 {
1787 let l0 = std::mem::size_of::<T0>() * L0;
1788 let l1 = std::mem::size_of::<T1>() * L1;
1789 assert!(l1 <= l0);
1790
1791 let self_ = &mut self[..];
1792
1793 let len = self_.len();
1794 if len == 0 {
1795 return;
1796 }
1797 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
1798
1799 let ptr = self_.as_mut_ptr();
1800 let mut write = 0usize;
1801 let mut read = 0usize;
1802
1803 unsafe {
1807 while read < len {
1808 let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
1809 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
1810 read += l0;
1811
1812 let out_chunk = reduce(in_chunk.assume_init());
1813
1814 std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
1815 write += l1;
1816 }
1817 }
1818
1819 self.truncate(write);
1820 }
1821
1822 pub fn cast_reduce_in_place_dyn<T0, T1>(
1834 &mut self,
1835 in_chunk_len: usize,
1836 out_chunk_buf: &mut [T1],
1837 mut reduce: impl FnMut(&[T0], &mut [T1]),
1838 ) where
1839 T0: bytemuck::AnyBitPattern,
1840 {
1841 let l0 = std::mem::size_of::<T0>() * in_chunk_len;
1842 let l1 = std::mem::size_of_val(out_chunk_buf);
1843
1844 assert!(l1 <= l0);
1845
1846 let self_ = &mut self[..];
1847
1848 let len = self_.len();
1849 if len == 0 {
1850 return;
1851 }
1852 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
1853
1854 let ptr = self_.as_mut_ptr();
1855 let mut write = 0usize;
1856 let mut read = 0usize;
1857
1858 unsafe {
1860 while read < len {
1861 reduce(
1862 bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
1863 &mut *out_chunk_buf,
1864 );
1865 read += l0;
1866
1867 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
1868 write += l1;
1869 }
1870 }
1871
1872 self.truncate(write);
1873 }
1874
1875 pub fn reverse_chunks<const L: usize>(&mut self) {
1883 let self_ = &mut self[..];
1884
1885 let len = self_.len();
1886
1887 if len == 0 || L == 0 {
1888 return;
1889 }
1890
1891 if L == 1 {
1892 return self_.reverse();
1893 }
1894
1895 assert!(len.is_multiple_of(L), "length must be multiple of L");
1896
1897 unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
1899 }
1900
1901 pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
1907 let self_ = &mut self[..];
1908
1909 let len = self_.len();
1910
1911 if len == 0 || chunk_len == 0 {
1912 return;
1913 }
1914
1915 if chunk_len == 1 {
1916 return self_.reverse();
1917 }
1918
1919 assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
1920
1921 let mut a = 0;
1922 let mut b = len - chunk_len;
1923
1924 let ptr = self_.as_mut_ptr();
1925
1926 unsafe {
1928 while a < b {
1929 std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
1930 a += chunk_len;
1931 b -= chunk_len;
1932 }
1933 }
1934 }
1935}
1936
1937type SliceIter<'a> = std::slice::Iter<'a, u8>;
1941self_cell::self_cell! {
1942 struct IpcBytesIntoIterInner {
1943 owner: IpcBytes,
1944 #[covariant]
1945 dependent: SliceIter,
1946 }
1947}
1948
1949pub struct IpcBytesIntoIter(IpcBytesIntoIterInner);
1951impl IpcBytesIntoIter {
1952 fn new(bytes: IpcBytes) -> Self {
1953 Self(IpcBytesIntoIterInner::new(bytes, |b| b.iter()))
1954 }
1955
1956 pub fn source(&self) -> &IpcBytes {
1958 self.0.borrow_owner()
1959 }
1960
1961 pub fn rest(&self) -> &[u8] {
1963 self.0.borrow_dependent().as_slice()
1964 }
1965}
1966impl Iterator for IpcBytesIntoIter {
1967 type Item = u8;
1968
1969 fn next(&mut self) -> Option<u8> {
1970 self.0.with_dependent_mut(|_, d| d.next().copied())
1971 }
1972
1973 fn size_hint(&self) -> (usize, Option<usize>) {
1974 self.0.borrow_dependent().size_hint()
1975 }
1976
1977 fn count(self) -> usize
1978 where
1979 Self: Sized,
1980 {
1981 self.0.borrow_dependent().as_slice().len()
1982 }
1983
1984 fn nth(&mut self, n: usize) -> Option<u8> {
1985 self.0.with_dependent_mut(|_, d| d.nth(n).copied())
1986 }
1987
1988 fn last(mut self) -> Option<Self::Item>
1989 where
1990 Self: Sized,
1991 {
1992 self.next_back()
1993 }
1994}
1995impl DoubleEndedIterator for IpcBytesIntoIter {
1996 fn next_back(&mut self) -> Option<Self::Item> {
1997 self.0.with_dependent_mut(|_, d| d.next_back().copied())
1998 }
1999
2000 fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
2001 self.0.with_dependent_mut(|_, d| d.nth_back(n).copied())
2002 }
2003}
2004impl FusedIterator for IpcBytesIntoIter {}
2005impl Default for IpcBytesIntoIter {
2006 fn default() -> Self {
2007 IpcBytes::empty().into_iter()
2008 }
2009}
2010impl IntoIterator for IpcBytes {
2011 type Item = u8;
2012
2013 type IntoIter = IpcBytesIntoIter;
2014
2015 fn into_iter(self) -> Self::IntoIter {
2016 IpcBytesIntoIter::new(self)
2017 }
2018}
2019
2020pub struct IpcBytesCastIntoIter<T: bytemuck::AnyBitPattern>(IpcBytesIntoIter, IpcBytesCast<T>);
2022impl<T: bytemuck::AnyBitPattern> IpcBytesCastIntoIter<T> {
2023 fn new(bytes: IpcBytesCast<T>) -> Self {
2024 Self(bytes.bytes.clone().into_iter(), bytes)
2025 }
2026
2027 pub fn source(&self) -> &IpcBytesCast<T> {
2029 &self.1
2030 }
2031
2032 pub fn rest(&self) -> &[T] {
2034 bytemuck::cast_slice(self.0.rest())
2035 }
2036}
2037impl<T: bytemuck::AnyBitPattern> Iterator for IpcBytesCastIntoIter<T> {
2038 type Item = T;
2039
2040 fn next(&mut self) -> Option<T> {
2041 let size = size_of::<T>();
2042 let r = *bytemuck::from_bytes(self.0.rest().get(..size)?);
2043 self.0.nth(size - 1);
2044 Some(r)
2045 }
2046
2047 fn size_hint(&self) -> (usize, Option<usize>) {
2048 let (mut min, mut max) = self.0.size_hint();
2049 min /= size_of::<T>();
2050 if let Some(max) = &mut max {
2051 *max /= size_of::<T>();
2052 }
2053 (min, max)
2054 }
2055
2056 fn nth(&mut self, n: usize) -> Option<T> {
2057 let size = size_of::<T>();
2058
2059 let byte_skip = n.checked_mul(size)?;
2060 let byte_end = byte_skip.checked_add(size)?;
2061
2062 let bytes = self.0.rest().get(byte_skip..byte_end)?;
2063 let r = *bytemuck::from_bytes(bytes);
2064
2065 self.0.nth(byte_end - 1);
2066
2067 Some(r)
2068 }
2069
2070 fn last(mut self) -> Option<Self::Item>
2071 where
2072 Self: Sized,
2073 {
2074 self.next_back()
2075 }
2076}
2077impl<T: bytemuck::AnyBitPattern> DoubleEndedIterator for IpcBytesCastIntoIter<T> {
2078 fn next_back(&mut self) -> Option<T> {
2079 let size = size_of::<T>();
2080
2081 let len = self.0.rest().len();
2082 if len < size {
2083 return None;
2084 }
2085
2086 let start = len - size;
2087 let bytes = &self.0.rest()[start..];
2088 let r = *bytemuck::from_bytes(bytes);
2089
2090 self.0.nth_back(size - 1);
2091
2092 Some(r)
2093 }
2094
2095 fn nth_back(&mut self, n: usize) -> Option<T> {
2096 let size = size_of::<T>();
2097
2098 let rev_byte_skip = n.checked_mul(size)?;
2099 let rev_byte_end = rev_byte_skip.checked_add(size)?;
2100 let len = self.0.rest().len();
2101
2102 if len < rev_byte_end {
2103 return None;
2104 }
2105
2106 let start = len - rev_byte_end;
2107 let end = len - rev_byte_skip;
2108
2109 let bytes = &self.0.rest()[start..end];
2110 let r = *bytemuck::from_bytes(bytes);
2111
2112 self.0.nth_back(rev_byte_end - 1);
2113
2114 Some(r)
2115 }
2116}
2117impl<T: bytemuck::AnyBitPattern> FusedIterator for IpcBytesCastIntoIter<T> {}
2118impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCastIntoIter<T> {
2119 fn default() -> Self {
2120 IpcBytes::empty().cast::<T>().into_iter()
2121 }
2122}
2123impl<T: bytemuck::AnyBitPattern> IntoIterator for IpcBytesCast<T> {
2124 type Item = T;
2125
2126 type IntoIter = IpcBytesCastIntoIter<T>;
2127
2128 fn into_iter(self) -> Self::IntoIter {
2129 IpcBytesCastIntoIter::new(self)
2130 }
2131}