1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4 fmt,
5 io::{self, Write as _},
6 mem::MaybeUninit,
7 ops,
8 path::PathBuf,
9 pin::Pin,
10 sync::Arc,
11};
12
13#[cfg(ipc)]
14use ipc_channel::ipc::IpcSharedMemory;
15
16use crate::channel::IpcBytes;
17use crate::channel::ipc_bytes::IpcBytesData;
18#[cfg(ipc)]
19use crate::channel::ipc_bytes_memmap::MemmapMut;
20
21enum IpcBytesMutInner {
22 Heap(Vec<u8>),
23 #[cfg(ipc)]
24 AnonMemMap(IpcSharedMemory),
25 #[cfg(ipc)]
26 MemMap(MemmapMut),
27}
28
29pub struct IpcBytesMut {
34 inner: IpcBytesMutInner,
35 len: usize,
36}
37impl ops::Deref for IpcBytesMut {
38 type Target = [u8];
39
40 fn deref(&self) -> &Self::Target {
41 let len = self.len;
42 match &self.inner {
43 IpcBytesMutInner::Heap(v) => &v[..len],
44 #[cfg(ipc)]
45 IpcBytesMutInner::AnonMemMap(m) => &m[..len],
46 #[cfg(ipc)]
47 IpcBytesMutInner::MemMap(m) => &m[..len],
48 }
49 }
50}
51impl ops::DerefMut for IpcBytesMut {
52 fn deref_mut(&mut self) -> &mut Self::Target {
53 let len = self.len;
54 match &mut self.inner {
55 IpcBytesMutInner::Heap(v) => &mut v[..len],
56 #[cfg(ipc)]
57 IpcBytesMutInner::AnonMemMap(m) => {
58 unsafe { m.deref_mut() }
60 }
61 #[cfg(ipc)]
62 IpcBytesMutInner::MemMap(m) => &mut m[..len],
63 }
64 }
65}
66impl fmt::Debug for IpcBytesMut {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 write!(f, "IpcBytesMut(<{} bytes>)", self.len())
69 }
70}
71impl IpcBytesMut {
72 pub async fn new(len: usize) -> io::Result<IpcBytesMut> {
74 #[cfg(ipc)]
75 if len <= IpcBytes::INLINE_MAX {
76 Ok(IpcBytesMut {
77 len,
78 inner: IpcBytesMutInner::Heap(vec![0; len]),
79 })
80 } else if len <= IpcBytes::UNNAMED_MAX {
81 Ok(IpcBytesMut {
82 len,
83 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
84 })
85 } else {
86 blocking::unblock(move || Self::new_blocking(len)).await
87 }
88
89 #[cfg(not(ipc))]
90 {
91 Ok(IpcBytesMut {
92 len,
93 inner: IpcBytesMutInner::Heap(vec![0; len]),
94 })
95 }
96 }
97
98 pub fn new_blocking(len: usize) -> io::Result<IpcBytesMut> {
100 #[cfg(ipc)]
101 if len <= IpcBytes::INLINE_MAX {
102 Ok(IpcBytesMut {
103 len,
104 inner: IpcBytesMutInner::Heap(vec![0; len]),
105 })
106 } else if len <= IpcBytes::UNNAMED_MAX {
107 Ok(IpcBytesMut {
108 len,
109 inner: IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_byte(0, len)),
110 })
111 } else {
112 Ok(IpcBytesMut {
113 len,
114 inner: IpcBytesMutInner::MemMap(MemmapMut::new(len)?),
115 })
116 }
117 #[cfg(not(ipc))]
118 {
119 Ok(IpcBytesMut {
120 len,
121 inner: IpcBytesMutInner::Heap(vec![0; len]),
122 })
123 }
124 }
125
126 #[cfg(ipc)]
133 pub async fn new_memmap(len: usize) -> io::Result<Self> {
134 blocking::unblock(move || Self::new_memmap_blocking(len)).await
135 }
136
137 #[cfg(ipc)]
144 pub fn new_memmap_blocking(len: usize) -> io::Result<Self> {
145 Ok(Self {
146 len,
147 inner: IpcBytesMutInner::MemMap(MemmapMut::new(len)?),
148 })
149 }
150
151 pub async fn from_vec(buf: Vec<u8>) -> io::Result<Self> {
153 #[cfg(ipc)]
154 if buf.len() <= IpcBytes::INLINE_MAX {
155 Ok(Self {
156 len: buf.len(),
157 inner: IpcBytesMutInner::Heap(buf),
158 })
159 } else {
160 blocking::unblock(move || {
161 let mut b = Self::new_blocking(buf.len())?;
162 b[..].copy_from_slice(&buf);
163 Ok(b)
164 })
165 .await
166 }
167 #[cfg(not(ipc))]
168 {
169 Ok(Self {
170 len: buf.len(),
171 inner: IpcBytesMutInner::Heap(buf),
172 })
173 }
174 }
175
176 pub fn from_vec_blocking(buf: Vec<u8>) -> io::Result<Self> {
178 #[cfg(ipc)]
179 if buf.len() <= IpcBytes::INLINE_MAX {
180 Ok(Self {
181 len: buf.len(),
182 inner: IpcBytesMutInner::Heap(buf),
183 })
184 } else {
185 let mut b = Self::new_blocking(buf.len())?;
186 b[..].copy_from_slice(&buf);
187 Ok(b)
188 }
189 #[cfg(not(ipc))]
190 {
191 Ok(Self {
192 len: buf.len(),
193 inner: IpcBytesMutInner::Heap(buf),
194 })
195 }
196 }
197
198 pub fn from_slice_blocking(buf: &[u8]) -> io::Result<Self> {
200 #[cfg(ipc)]
201 if buf.len() <= IpcBytes::INLINE_MAX {
202 Ok(Self {
203 len: buf.len(),
204 inner: IpcBytesMutInner::Heap(buf.to_vec()),
205 })
206 } else {
207 let mut b = Self::new_blocking(buf.len())?;
208 b[..].copy_from_slice(buf);
209 Ok(b)
210 }
211 #[cfg(not(ipc))]
212 {
213 Ok(Self {
214 len: buf.len(),
215 inner: IpcBytesMutInner::Heap(buf.to_vec()),
216 })
217 }
218 }
219
220 pub async fn from_bytes(bytes: IpcBytes) -> io::Result<Self> {
222 blocking::unblock(move || Self::from_bytes_blocking(bytes)).await
223 }
224
225 pub fn from_bytes_blocking(bytes: IpcBytes) -> io::Result<Self> {
227 #[cfg_attr(not(ipc), allow(irrefutable_let_patterns))]
228 if let IpcBytesData::Heap(_) = &*bytes.0 {
229 match Arc::try_unwrap(bytes.0) {
230 Ok(r) => match r {
231 IpcBytesData::Heap(r) => Ok(Self {
232 len: r.len(),
233 inner: IpcBytesMutInner::Heap(r),
234 }),
235 _ => unreachable!(),
236 },
237 Err(a) => Self::from_slice_blocking(&IpcBytes(a)[..]),
238 }
239 } else {
240 Self::from_slice_blocking(&bytes[..])
241 }
242 }
243
244 #[cfg(ipc)]
255 pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<u64>>) -> io::Result<Self> {
256 blocking::unblock(move || {
257 unsafe { Self::open_memmap_blocking(file, range) }
259 })
260 .await
261 }
262
263 #[cfg(ipc)]
274 pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<u64>>) -> io::Result<Self> {
275 let map = unsafe { MemmapMut::write_user_file(file, range) }?;
277
278 Ok(Self {
279 len: map.len(),
280 inner: IpcBytesMutInner::MemMap(map),
281 })
282 }
283
284 #[cfg(ipc)]
291 pub async unsafe fn create(file: PathBuf, len: usize) -> io::Result<Self> {
292 blocking::unblock(move || {
293 unsafe { Self::create_blocking(file, len) }
295 })
296 .await
297 }
298
299 #[cfg(ipc)]
306 pub unsafe fn create_blocking(file: PathBuf, len: usize) -> io::Result<Self> {
307 let map = unsafe { MemmapMut::create_user_file(file, len) }?;
309
310 Ok(Self {
311 len,
312 inner: IpcBytesMutInner::MemMap(map),
313 })
314 }
315}
316impl IpcBytesMut {
317 pub async fn finish(mut self) -> io::Result<IpcBytes> {
319 let len = self.len;
320 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
321 IpcBytesMutInner::Heap(mut v) => {
322 v.truncate(len);
323 v.shrink_to_fit();
324 IpcBytesData::Heap(v)
325 }
326 #[cfg(ipc)]
327 IpcBytesMutInner::AnonMemMap(m) => {
328 if len < IpcBytes::INLINE_MAX {
329 IpcBytesData::Heap(m[..len].to_vec())
330 } else if len < m.len() {
331 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
332 } else {
333 IpcBytesData::AnonMemMap(m)
334 }
335 }
336 #[cfg(ipc)]
337 IpcBytesMutInner::MemMap(m) => {
338 let m = m.into_read_only()?;
339 IpcBytesData::MemMap(m)
340 }
341 };
342 Ok(IpcBytes(Arc::new(data)))
343 }
344
345 pub fn finish_blocking(mut self) -> io::Result<IpcBytes> {
347 let len = self.len;
348 let data = match std::mem::replace(&mut self.inner, IpcBytesMutInner::Heap(vec![])) {
349 IpcBytesMutInner::Heap(mut v) => {
350 v.truncate(len);
351 IpcBytesData::Heap(v)
352 }
353 #[cfg(ipc)]
354 IpcBytesMutInner::AnonMemMap(m) => {
355 if len < IpcBytes::INLINE_MAX {
356 IpcBytesData::Heap(m[..len].to_vec())
357 } else if len < m.len() {
358 IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&m[..len]))
359 } else {
360 IpcBytesData::AnonMemMap(m)
361 }
362 }
363 #[cfg(ipc)]
364 IpcBytesMutInner::MemMap(m) => {
365 let m = m.into_read_only()?;
366 IpcBytesData::MemMap(m)
367 }
368 };
369 Ok(IpcBytes(Arc::new(data)))
370 }
371}
372impl IpcBytesMut {
373 pub fn truncate(&mut self, new_len: usize) {
379 self.len = self.len.min(new_len);
380 }
381
382 pub fn reduce_in_place<const L0: usize, const L1: usize>(&mut self, mut reduce: impl FnMut([u8; L0]) -> [u8; L1]) {
392 assert!(L1 <= L0);
393
394 let self_ = &mut self[..];
395
396 let len = self_.len();
397 if len == 0 {
398 return;
399 }
400 assert!(len.is_multiple_of(L0), "length must be multiple of L0");
401
402 let ptr = self_.as_mut_ptr();
403 let mut write = 0usize;
404 let mut read = 0usize;
405
406 unsafe {
408 while read < len {
409 let mut in_chunk = MaybeUninit::<[u8; L0]>::uninit();
410 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr(), L0);
411 read += L0;
412
413 let out_chunk = reduce(in_chunk.assume_init());
414
415 std::ptr::copy_nonoverlapping(out_chunk.as_ptr(), ptr.add(write), L1);
416 write += L1;
417 }
418 }
419
420 self.truncate(write);
421 }
422
423 pub fn reduce_in_place_dyn(&mut self, in_chunk_len: usize, out_chunk_buf: &mut [u8], mut reduce: impl FnMut(&[u8], &mut [u8])) {
433 assert!(out_chunk_buf.len() < in_chunk_len);
434
435 let self_ = &mut self[..];
436
437 let len = self_.len();
438 if len == 0 {
439 return;
440 }
441 assert!(len.is_multiple_of(in_chunk_len), "length must be multiple of in_chunk_len");
442
443 let ptr = self_.as_mut_ptr();
444 let mut write = 0usize;
445 let mut read = 0usize;
446
447 unsafe {
449 while read < len {
450 reduce(std::slice::from_raw_parts(ptr.add(read), in_chunk_len), &mut *out_chunk_buf);
451 read += in_chunk_len;
452
453 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr(), ptr.add(write), out_chunk_buf.len());
454 write += out_chunk_buf.len();
455 }
456 }
457
458 self.truncate(write);
459 }
460
461 pub fn cast_reduce_in_place<T0, const L0: usize, T1, const L1: usize>(&mut self, mut reduce: impl FnMut([T0; L0]) -> [T1; L1])
471 where
472 T0: bytemuck::AnyBitPattern,
473 {
474 let l0 = std::mem::size_of::<T0>() * L0;
475 let l1 = std::mem::size_of::<T1>() * L1;
476 assert!(l1 <= l0);
477
478 let self_ = &mut self[..];
479
480 let len = self_.len();
481 if len == 0 {
482 return;
483 }
484 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * L0");
485
486 let ptr = self_.as_mut_ptr();
487 let mut write = 0usize;
488 let mut read = 0usize;
489
490 unsafe {
494 while read < len {
495 let mut in_chunk = MaybeUninit::<[T0; L0]>::uninit();
496 std::ptr::copy_nonoverlapping(ptr.add(read), (*in_chunk.as_mut_ptr()).as_mut_ptr() as _, l0);
497 read += l0;
498
499 let out_chunk = reduce(in_chunk.assume_init());
500
501 std::ptr::copy_nonoverlapping(out_chunk.as_ptr() as _, ptr.add(write), l1);
502 write += l1;
503 }
504 }
505
506 self.truncate(write);
507 }
508
509 pub fn cast_reduce_in_place_dyn<T0, T1>(
521 &mut self,
522 in_chunk_len: usize,
523 out_chunk_buf: &mut [T1],
524 mut reduce: impl FnMut(&[T0], &mut [T1]),
525 ) where
526 T0: bytemuck::AnyBitPattern,
527 {
528 let l0 = std::mem::size_of::<T0>() * in_chunk_len;
529 let l1 = std::mem::size_of_val(out_chunk_buf);
530
531 assert!(l1 <= l0);
532
533 let self_ = &mut self[..];
534
535 let len = self_.len();
536 if len == 0 {
537 return;
538 }
539 assert!(len.is_multiple_of(l0), "length must be multiple of size_of::<T0>() * in_chunk_len");
540
541 let ptr = self_.as_mut_ptr();
542 let mut write = 0usize;
543 let mut read = 0usize;
544
545 unsafe {
547 while read < len {
548 reduce(
549 bytemuck::cast_slice(std::slice::from_raw_parts(ptr.add(read), l0)),
550 &mut *out_chunk_buf,
551 );
552 read += l0;
553
554 std::ptr::copy_nonoverlapping(out_chunk_buf.as_ptr() as _, ptr.add(write), l1);
555 write += l1;
556 }
557 }
558
559 self.truncate(write);
560 }
561
562 pub fn reverse_chunks<const L: usize>(&mut self) {
570 let self_ = &mut self[..];
571
572 let len = self_.len();
573
574 if len == 0 || L == 0 {
575 return;
576 }
577
578 if L == 1 {
579 return self_.reverse();
580 }
581
582 assert!(len.is_multiple_of(L), "length must be multiple of L");
583
584 unsafe { self_.as_chunks_unchecked_mut::<L>() }.reverse();
586 }
587
588 pub fn reverse_chunks_dyn(&mut self, chunk_len: usize) {
594 let self_ = &mut self[..];
595
596 let len = self_.len();
597
598 if len == 0 || chunk_len == 0 {
599 return;
600 }
601
602 if chunk_len == 1 {
603 return self_.reverse();
604 }
605
606 assert!(len.is_multiple_of(chunk_len), "length must be multiple of chunk_len");
607
608 let mut a = 0;
609 let mut b = len - chunk_len;
610
611 let ptr = self_.as_mut_ptr();
612
613 unsafe {
615 while a < b {
616 std::ptr::swap_nonoverlapping(ptr.add(a), ptr.add(b), chunk_len);
617 a += chunk_len;
618 b -= chunk_len;
619 }
620 }
621 }
622}
623
624pub struct IpcBytesWriter {
628 inner: blocking::Unblock<IpcBytesWriterBlocking>,
629}
630impl IpcBytesWriter {
631 pub async fn finish(self) -> std::io::Result<IpcBytes> {
633 let inner = self.inner.into_inner().await;
634 blocking::unblock(move || inner.finish()).await
635 }
636
637 pub async fn finish_mut(self) -> std::io::Result<super::IpcBytesMut> {
639 let inner = self.inner.into_inner().await;
640 blocking::unblock(move || inner.finish_mut()).await
641 }
642}
643impl crate::io::AsyncWrite for IpcBytesWriter {
644 fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> std::task::Poll<io::Result<usize>> {
645 crate::io::AsyncWrite::poll_write(Pin::new(&mut Pin::get_mut(self).inner), cx, buf)
646 }
647
648 fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
649 crate::io::AsyncWrite::poll_flush(Pin::new(&mut Pin::get_mut(self).inner), cx)
650 }
651
652 fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<io::Result<()>> {
653 crate::io::AsyncWrite::poll_close(Pin::new(&mut Pin::get_mut(self).inner), cx)
654 }
655}
656impl crate::io::AsyncSeek for IpcBytesWriter {
657 fn poll_seek(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, pos: io::SeekFrom) -> std::task::Poll<io::Result<u64>> {
658 crate::io::AsyncSeek::poll_seek(Pin::new(&mut Pin::get_mut(self).inner), cx, pos)
659 }
660}
661
662pub struct IpcBytesWriterBlocking {
666 #[cfg(ipc)]
667 heap_buf: Vec<u8>,
668 #[cfg(ipc)]
669 memmap: Option<std::fs::File>,
670
671 #[cfg(not(ipc))]
672 heap_buf: std::io::Cursor<Vec<u8>>,
673}
674impl IpcBytesWriterBlocking {
675 pub fn finish(self) -> std::io::Result<IpcBytes> {
677 let m = self.finish_mut()?;
678 m.finish_blocking()
679 }
680
681 pub fn finish_mut(mut self) -> std::io::Result<super::IpcBytesMut> {
683 self.flush()?;
684 #[cfg(ipc)]
685 {
686 let (len, inner) = match self.memmap {
687 Some(file) => {
688 let map = MemmapMut::end_write(file)?;
689 let len = map.len();
690 (len, IpcBytesMutInner::MemMap(map))
691 }
692 None => {
693 let len = self.heap_buf.len();
694 let i = if self.heap_buf.len() > IpcBytes::INLINE_MAX {
695 IpcBytesMutInner::AnonMemMap(IpcSharedMemory::from_bytes(&self.heap_buf))
696 } else {
697 IpcBytesMutInner::Heap(self.heap_buf)
698 };
699 (len, i)
700 }
701 };
702 Ok(IpcBytesMut { len, inner })
703 }
704 #[cfg(not(ipc))]
705 {
706 let heap_buf = self.heap_buf.into_inner();
707 let len = heap_buf.len();
708 let inner = IpcBytesMutInner::Heap(heap_buf);
709 Ok(IpcBytesMut { len, inner })
710 }
711 }
712
713 #[cfg(ipc)]
714 fn alloc_memmap_file(&mut self) -> io::Result<()> {
715 if self.memmap.is_none() {
716 self.memmap = Some(MemmapMut::begin_write()?);
717 }
718 let file = &mut self.memmap.as_mut().unwrap();
719
720 file.write_all(&self.heap_buf)?;
721 self.heap_buf.clear();
723 Ok(())
724 }
725}
726impl std::io::Write for IpcBytesWriterBlocking {
727 fn write(&mut self, write_buf: &[u8]) -> io::Result<usize> {
728 #[cfg(ipc)]
729 {
730 if self.heap_buf.len() + write_buf.len() > IpcBytes::UNNAMED_MAX {
731 self.alloc_memmap_file()?;
733
734 if write_buf.len() > IpcBytes::UNNAMED_MAX {
735 self.memmap.as_mut().unwrap().write_all(write_buf)?;
737 } else {
738 self.heap_buf.extend_from_slice(write_buf);
739 }
740 } else {
741 if self.memmap.is_none() {
742 self.heap_buf
744 .reserve_exact((self.heap_buf.capacity().max(1024) * 2).min(IpcBytes::UNNAMED_MAX));
745 }
746 self.heap_buf.extend_from_slice(write_buf);
747 }
748
749 Ok(write_buf.len())
750 }
751
752 #[cfg(not(ipc))]
753 {
754 std::io::Write::write(&mut self.heap_buf, write_buf)
755 }
756 }
757
758 fn flush(&mut self) -> io::Result<()> {
759 #[cfg(ipc)]
760 if let Some(file) = &mut self.memmap {
761 if !self.heap_buf.is_empty() {
762 file.write_all(&self.heap_buf)?;
763 self.heap_buf.clear();
764 }
765 file.flush()?;
766 }
767 Ok(())
768 }
769}
770impl std::io::Seek for IpcBytesWriterBlocking {
771 fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
772 #[cfg(ipc)]
773 {
774 self.alloc_memmap_file()?;
775 let file = self.memmap.as_mut().unwrap();
776 if !self.heap_buf.is_empty() {
777 file.write_all(&self.heap_buf)?;
778 self.heap_buf.clear();
779 }
780 file.seek(pos)
781 }
782 #[cfg(not(ipc))]
783 {
784 std::io::Seek::seek(&mut self.heap_buf, pos)
785 }
786 }
787}
788
789impl IpcBytes {
790 pub async fn new_writer() -> IpcBytesWriter {
792 IpcBytesWriter {
793 inner: blocking::Unblock::new(Self::new_writer_blocking()),
794 }
795 }
796
797 pub fn new_writer_blocking() -> IpcBytesWriterBlocking {
799 IpcBytesWriterBlocking {
800 #[cfg(ipc)]
801 heap_buf: vec![],
802 #[cfg(ipc)]
803 memmap: None,
804
805 #[cfg(not(ipc))]
806 heap_buf: std::io::Cursor::new(vec![]),
807 }
808 }
809}