zng_task/channel/
ipc_bytes.rs1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{
4 cell::Cell,
5 fmt, fs,
6 io::{self, Read, Write},
7 iter::FusedIterator,
8 ops,
9 path::{Path, PathBuf},
10 pin::Pin,
11 sync::{Arc, Weak},
12};
13
14use futures_lite::{AsyncReadExt as _, AsyncWriteExt as _};
15#[cfg(ipc)]
16use ipc_channel::ipc::IpcSharedMemory;
17use serde::{Deserialize, Serialize};
18use zng_app_context::RunOnDrop;
19
20#[cfg(ipc)]
21use crate::channel::ipc_bytes_memmap::Memmap;
22
23#[derive(Clone)]
47#[repr(transparent)]
48pub struct IpcBytes(pub(super) Arc<IpcBytesData>);
49impl Serialize for IpcBytes {
50 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
51 where
52 S: serde::Serializer,
53 {
54 Serialize::serialize(&*self.0, serializer)
55 }
56}
57impl<'de> Deserialize<'de> for IpcBytes {
58 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
59 where
60 D: serde::Deserializer<'de>,
61 {
62 let b = <IpcBytesData as Deserialize>::deserialize(deserializer)?;
63 Ok(Self(Arc::new(b)))
64 }
65}
66#[derive(Serialize, Deserialize)]
67pub(super) enum IpcBytesData {
68 Heap(Vec<u8>),
69 #[cfg(ipc)]
70 AnonMemMap(IpcSharedMemory),
71 #[cfg(ipc)]
72 MemMap(Memmap),
73}
74impl fmt::Debug for IpcBytes {
75 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
76 write!(f, "IpcBytes(<{} bytes>)", self.len())
77 }
78}
79impl ops::Deref for IpcBytes {
80 type Target = [u8];
81
82 fn deref(&self) -> &Self::Target {
83 match &*self.0 {
84 IpcBytesData::Heap(i) => i,
85 #[cfg(ipc)]
86 IpcBytesData::AnonMemMap(m) => m,
87 #[cfg(ipc)]
88 IpcBytesData::MemMap(f) => f,
89 }
90 }
91}
92
93impl IpcBytes {
94 pub fn empty() -> Self {
96 IpcBytes(Arc::new(IpcBytesData::Heap(vec![])))
97 }
98}
99
100impl IpcBytes {
104 pub async fn from_vec(data: Vec<u8>) -> io::Result<Self> {
106 blocking::unblock(move || Self::from_vec_blocking(data)).await
107 }
108
109 pub fn from_vec_blocking(data: Vec<u8>) -> io::Result<Self> {
111 #[cfg(ipc)]
112 {
113 if data.len() <= Self::INLINE_MAX {
114 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
115 } else {
116 Self::from_slice_blocking(&data)
117 }
118 }
119 #[cfg(not(ipc))]
120 {
121 Ok(Self(Arc::new(IpcBytesData::Heap(data))))
122 }
123 }
124
125 pub fn from_slice_blocking(data: &[u8]) -> io::Result<Self> {
127 #[cfg(ipc)]
128 {
129 if data.len() <= Self::INLINE_MAX {
130 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
131 } else if data.len() <= Self::UNNAMED_MAX {
132 Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(data)))))
133 } else {
134 Self::new_memmap_blocking(|m| m.write_all(data))
135 }
136 }
137 #[cfg(not(ipc))]
138 {
139 Ok(Self(Arc::new(IpcBytesData::Heap(data.to_vec()))))
140 }
141 }
142
143 pub async fn from_iter(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
154 #[cfg(ipc)]
155 {
156 let (min, max) = iter.size_hint();
157 if let Some(max) = max {
158 if max <= Self::INLINE_MAX {
159 return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
160 } else if max == min {
161 let mut r = super::IpcBytesMut::new(max).await?;
162 let mut actual_len = 0;
163 for (i, b) in r.iter_mut().zip(iter) {
164 *i = b;
165 actual_len += 1;
166 }
167 r.truncate(actual_len);
168 return r.finish().await;
169 }
170 }
171
172 let mut writer = Self::new_writer().await;
173 for b in iter {
174 writer.write_all(&[b]).await?;
175 }
176 writer.finish().await
177 }
178
179 #[cfg(not(ipc))]
180 {
181 Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
182 }
183 }
184
185 pub fn from_iter_blocking(iter: impl Iterator<Item = u8>) -> io::Result<Self> {
197 #[cfg(ipc)]
198 {
199 let (min, max) = iter.size_hint();
200 if let Some(max) = max {
201 if max <= Self::INLINE_MAX {
202 return Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))));
203 } else if max == min {
204 let mut r = super::IpcBytesMut::new_blocking(max)?;
205 let mut actual_len = 0;
206 for (i, b) in r.iter_mut().zip(iter) {
207 *i = b;
208 actual_len += 1;
209 }
210 r.truncate(actual_len);
211 return r.finish_blocking();
212 }
213 }
214
215 let mut writer = Self::new_writer_blocking();
216 for b in iter {
217 writer.write_all(&[b])?;
218 }
219 writer.finish()
220 }
221 #[cfg(not(ipc))]
222 {
223 Ok(Self(Arc::new(IpcBytesData::Heap(iter.collect()))))
224 }
225 }
226
227 pub async fn from_read(data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
229 #[cfg(ipc)]
230 {
231 Self::from_read_ipc(data).await
232 }
233 #[cfg(not(ipc))]
234 {
235 let mut data = data;
236 let mut buf = vec![];
237 data.read_to_end(&mut buf).await;
238 Self::from_vec(buf).await
239 }
240 }
241 #[cfg(ipc)]
242 async fn from_read_ipc(mut data: Pin<&mut (dyn futures_lite::AsyncRead + Send)>) -> io::Result<Self> {
243 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
244 let mut len = 0;
245
246 loop {
248 match data.read(&mut buf[len..]).await {
249 Ok(l) => {
250 if l == 0 {
251 buf.truncate(len);
253 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
254 } else {
255 len += l;
256 if len == Self::INLINE_MAX + 1 {
257 break;
259 }
260 }
261 }
262 Err(e) => match e.kind() {
263 io::ErrorKind::WouldBlock => continue,
264 _ => return Err(e),
265 },
266 }
267 }
268
269 buf.resize(Self::UNNAMED_MAX + 1, 0);
271 loop {
272 match data.read(&mut buf[len..]).await {
273 Ok(l) => {
274 if l == 0 {
275 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
277 } else {
278 len += l;
279 if len == Self::UNNAMED_MAX + 1 {
280 break;
282 }
283 }
284 }
285 Err(e) => match e.kind() {
286 io::ErrorKind::WouldBlock => continue,
287 _ => return Err(e),
288 },
289 }
290 }
291
292 Self::new_memmap(async |m| {
294 use futures_lite::AsyncWriteExt as _;
295
296 m.write_all(&buf).await?;
297 crate::io::copy(data, m).await?;
298 Ok(())
299 })
300 .await
301 }
302
303 pub fn from_read_blocking(data: &mut dyn io::Read) -> io::Result<Self> {
305 #[cfg(ipc)]
306 {
307 Self::from_read_blocking_ipc(data)
308 }
309 #[cfg(not(ipc))]
310 {
311 let mut buf = vec![];
312 data.read_to_end(&mut buf)?;
313 Self::from_vec_blocking(buf)
314 }
315 }
316 #[cfg(ipc)]
317 fn from_read_blocking_ipc(data: &mut dyn io::Read) -> io::Result<Self> {
318 let mut buf = vec![0u8; Self::INLINE_MAX + 1];
319 let mut len = 0;
320
321 loop {
323 match data.read(&mut buf[len..]) {
324 Ok(l) => {
325 if l == 0 {
326 buf.truncate(len);
328 return Ok(Self(Arc::new(IpcBytesData::Heap(buf))));
329 } else {
330 len += l;
331 if len == Self::INLINE_MAX + 1 {
332 break;
334 }
335 }
336 }
337 Err(e) => match e.kind() {
338 io::ErrorKind::WouldBlock => continue,
339 _ => return Err(e),
340 },
341 }
342 }
343
344 buf.resize(Self::UNNAMED_MAX + 1, 0);
346 loop {
347 match data.read(&mut buf[len..]) {
348 Ok(l) => {
349 if l == 0 {
350 return Ok(Self(Arc::new(IpcBytesData::AnonMemMap(IpcSharedMemory::from_bytes(&buf[..len])))));
352 } else {
353 len += l;
354 if len == Self::UNNAMED_MAX + 1 {
355 break;
357 }
358 }
359 }
360 Err(e) => match e.kind() {
361 io::ErrorKind::WouldBlock => continue,
362 _ => return Err(e),
363 },
364 }
365 }
366
367 Self::new_memmap_blocking(|m| {
369 m.write_all(&buf)?;
370 io::copy(data, m)?;
371 Ok(())
372 })
373 }
374
375 pub async fn from_path(path: PathBuf) -> io::Result<Self> {
377 let file = crate::fs::File::open(path).await?;
378 Self::from_file(file).await
379 }
380 pub async fn from_file(mut file: crate::fs::File) -> io::Result<Self> {
382 #[cfg(ipc)]
383 {
384 let len = file.metadata().await?.len();
385 if len <= Self::UNNAMED_MAX as u64 {
386 let mut buf = vec![0u8; len as usize];
387 file.read_exact(&mut buf).await?;
388 Self::from_vec_blocking(buf)
389 } else {
390 Self::new_memmap(async move |m| {
391 crate::io::copy(&mut file, m).await?;
392 Ok(())
393 })
394 .await
395 }
396 }
397 #[cfg(not(ipc))]
398 {
399 let mut buf = vec![];
400 file.read_to_end(&mut buf).await?;
401 Self::from_vec_blocking(buf)
402 }
403 }
404
405 pub fn from_path_blocking(path: &Path) -> io::Result<Self> {
407 let file = fs::File::open(path)?;
408 Self::from_file_blocking(file)
409 }
410 pub fn from_file_blocking(mut file: fs::File) -> io::Result<Self> {
412 #[cfg(ipc)]
413 {
414 let len = file.metadata()?.len();
415 if len <= Self::UNNAMED_MAX as u64 {
416 let mut buf = vec![0u8; len as usize];
417 file.read_exact(&mut buf)?;
418 Self::from_vec_blocking(buf)
419 } else {
420 Self::new_memmap_blocking(|m| {
421 io::copy(&mut file, m)?;
422 Ok(())
423 })
424 }
425 }
426 #[cfg(not(ipc))]
427 {
428 let mut buf = vec![];
429 file.read_to_end(&mut buf)?;
430 Self::from_vec_blocking(buf)
431 }
432 }
433
434 #[cfg(ipc)]
439 pub async fn new_memmap(write: impl AsyncFnOnce(&mut crate::fs::File) -> io::Result<()>) -> io::Result<Self> {
440 use crate::channel::ipc_bytes_memmap::MemmapMut;
441
442 let file = blocking::unblock(MemmapMut::begin_write).await?;
443 let mut file = crate::fs::File::from(file);
444 write(&mut file).await?;
445
446 match file.try_unwrap().await {
447 Ok(f) => {
448 let map = blocking::unblock(move || Memmap::end_write(f)).await?;
449 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
450 }
451 Err(_) => Err(io::Error::new(
452 io::ErrorKind::ResourceBusy,
453 "no all tasks started by `write` awaited before return",
454 )),
455 }
456 }
457
458 #[cfg(ipc)]
468 pub async unsafe fn open_memmap(file: PathBuf, range: Option<ops::Range<u64>>) -> io::Result<Self> {
469 blocking::unblock(move || {
470 unsafe { Self::open_memmap_blocking(file, range) }
472 })
473 .await
474 }
475
476 #[cfg(ipc)]
481 pub fn new_memmap_blocking(write: impl FnOnce(&mut fs::File) -> io::Result<()>) -> io::Result<Self> {
482 use crate::channel::ipc_bytes_memmap::MemmapMut;
483
484 let mut file = MemmapMut::begin_write()?;
485 write(&mut file)?;
486 let map = Memmap::end_write(file)?;
487
488 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
489 }
490
491 #[cfg(ipc)]
501 pub unsafe fn open_memmap_blocking(file: PathBuf, range: Option<ops::Range<u64>>) -> io::Result<Self> {
502 let map = unsafe { Memmap::read_user_file(file, range) }?;
504
505 Ok(Self(Arc::new(IpcBytesData::MemMap(map))))
506 }
507
508 pub fn ptr_eq(&self, other: &Self) -> bool {
510 let a = &self[..];
511 let b = &other[..];
512 (std::ptr::eq(a, b) && a.len() == b.len()) || (a.is_empty() && b.is_empty())
513 }
514
515 #[cfg(ipc)]
516 pub(super) const INLINE_MAX: usize = 64 * 1024; #[cfg(ipc)]
518 pub(super) const UNNAMED_MAX: usize = 128 * 1024 * 1024; }
520
521impl AsRef<[u8]> for IpcBytes {
522 fn as_ref(&self) -> &[u8] {
523 &self[..]
524 }
525}
526impl Default for IpcBytes {
527 fn default() -> Self {
528 Self::empty()
529 }
530}
531impl PartialEq for IpcBytes {
532 fn eq(&self, other: &Self) -> bool {
533 self.ptr_eq(other) || self[..] == other[..]
534 }
535}
536impl Eq for IpcBytes {}
537
538#[cfg(ipc)]
546pub fn with_ipc_serialization<R>(serialize: impl FnOnce() -> R) -> R {
547 let parent = IPC_SERIALIZATION.replace(true);
548 let _clean = RunOnDrop::new(|| IPC_SERIALIZATION.set(parent));
549 serialize()
550}
551
552#[cfg(ipc)]
554pub fn is_ipc_serialization() -> bool {
555 IPC_SERIALIZATION.get()
556}
557
558#[cfg(ipc)]
559thread_local! {
560 static IPC_SERIALIZATION: Cell<bool> = const { Cell::new(false) };
561}
562
563impl IpcBytes {
564 pub fn downgrade(&self) -> WeakIpcBytes {
568 WeakIpcBytes(Arc::downgrade(&self.0))
569 }
570}
571
572pub struct WeakIpcBytes(Weak<IpcBytesData>);
574impl WeakIpcBytes {
575 pub fn upgrade(&self) -> Option<IpcBytes> {
577 self.0.upgrade().map(IpcBytes)
578 }
579
580 pub fn strong_count(&self) -> usize {
582 self.0.strong_count()
583 }
584}
585
586type SliceIter<'a> = std::slice::Iter<'a, u8>;
590self_cell::self_cell! {
591 struct IpcBytesIntoIterInner {
592 owner: IpcBytes,
593 #[covariant]
594 dependent: SliceIter,
595 }
596}
597
598pub struct IpcBytesIntoIter(IpcBytesIntoIterInner);
600impl IpcBytesIntoIter {
601 fn new(bytes: IpcBytes) -> Self {
602 Self(IpcBytesIntoIterInner::new(bytes, |b| b.iter()))
603 }
604
605 pub fn source(&self) -> &IpcBytes {
607 self.0.borrow_owner()
608 }
609
610 pub fn rest(&self) -> &[u8] {
612 self.0.borrow_dependent().as_slice()
613 }
614}
615impl Iterator for IpcBytesIntoIter {
616 type Item = u8;
617
618 fn next(&mut self) -> Option<u8> {
619 self.0.with_dependent_mut(|_, d| d.next().copied())
620 }
621
622 fn size_hint(&self) -> (usize, Option<usize>) {
623 self.0.borrow_dependent().size_hint()
624 }
625
626 fn count(self) -> usize
627 where
628 Self: Sized,
629 {
630 self.0.borrow_dependent().as_slice().len()
631 }
632
633 fn nth(&mut self, n: usize) -> Option<u8> {
634 self.0.with_dependent_mut(|_, d| d.nth(n).copied())
635 }
636
637 fn last(mut self) -> Option<Self::Item>
638 where
639 Self: Sized,
640 {
641 self.next_back()
642 }
643}
644impl DoubleEndedIterator for IpcBytesIntoIter {
645 fn next_back(&mut self) -> Option<Self::Item> {
646 self.0.with_dependent_mut(|_, d| d.next_back().copied())
647 }
648
649 fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
650 self.0.with_dependent_mut(|_, d| d.nth_back(n).copied())
651 }
652}
653impl FusedIterator for IpcBytesIntoIter {}
654impl Default for IpcBytesIntoIter {
655 fn default() -> Self {
656 IpcBytes::empty().into_iter()
657 }
658}
659impl IntoIterator for IpcBytes {
660 type Item = u8;
661
662 type IntoIter = IpcBytesIntoIter;
663
664 fn into_iter(self) -> Self::IntoIter {
665 IpcBytesIntoIter::new(self)
666 }
667}