zng_task/channel/
ipc_bytes_cast.rs1#![cfg_attr(not(ipc), allow(unused))]
2
3use std::{fmt, io, iter::FusedIterator, marker::PhantomData, ops};
4
5use crate::channel::{IpcBytes, IpcBytesIntoIter, IpcBytesMut};
6
7pub struct IpcBytesMutCast<T: bytemuck::AnyBitPattern> {
11 bytes: IpcBytesMut,
12 _t: PhantomData<T>,
13}
14impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesMutCast<T> {
15 type Target = [T];
16
17 fn deref(&self) -> &Self::Target {
18 bytemuck::cast_slice::<u8, T>(&self.bytes)
19 }
20}
21impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> ops::DerefMut for IpcBytesMutCast<T> {
22 fn deref_mut(&mut self) -> &mut Self::Target {
23 bytemuck::cast_slice_mut::<u8, T>(&mut self.bytes)
24 }
25}
26impl<T: bytemuck::AnyBitPattern> IpcBytesMutCast<T> {
27 pub fn into_inner(self) -> IpcBytesMut {
29 self.bytes
30 }
31}
32impl<T: bytemuck::AnyBitPattern> From<IpcBytesMutCast<T>> for IpcBytesMut {
33 fn from(value: IpcBytesMutCast<T>) -> Self {
34 value.bytes
35 }
36}
37fn item_len_to_bytes<T: 'static>(len: usize) -> io::Result<usize> {
38 match len.checked_mul(size_of::<T>()) {
39 Some(l) => Ok(l),
40 None => Err(io::Error::new(io::ErrorKind::FileTooLarge, "cannot map more than usize::MAX")),
41 }
42}
43impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
44 pub async fn new(len: usize) -> io::Result<Self> {
46 IpcBytesMut::new(item_len_to_bytes::<T>(len)?).await.map(IpcBytesMut::cast)
47 }
48
49 pub fn new_blocking(len: usize) -> io::Result<Self> {
51 IpcBytesMut::new_blocking(item_len_to_bytes::<T>(len)?).map(IpcBytesMut::cast)
52 }
53
54 #[cfg(ipc)]
61 pub async fn new_memmap(len: usize) -> io::Result<Self> {
62 IpcBytesMut::new_memmap(item_len_to_bytes::<T>(len)?).await.map(IpcBytesMut::cast)
63 }
64
65 #[cfg(ipc)]
72 pub fn new_memmap_blocking(len: usize) -> io::Result<Self> {
73 IpcBytesMut::new_memmap_blocking(item_len_to_bytes::<T>(len)?).map(IpcBytesMut::cast)
74 }
75
76 pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
78 IpcBytesMut::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytesMut::cast)
79 }
80
81 pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
83 IpcBytesMut::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytesMut::cast)
84 }
85
86 pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
88 IpcBytesMut::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytesMut::cast)
89 }
90
91 pub fn as_bytes(&mut self) -> &mut IpcBytesMut {
93 &mut self.bytes
94 }
95}
96impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesMutCast<T> {
97 pub async fn finish(self) -> io::Result<IpcBytesCast<T>> {
99 self.bytes.finish().await.map(IpcBytes::cast)
100 }
101
102 pub fn finish_blocking(self) -> io::Result<IpcBytesCast<T>> {
104 self.bytes.finish_blocking().map(IpcBytes::cast)
105 }
106}
107
108impl IpcBytesMut {
109 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesMutCast<T> {
119 let r = IpcBytesMutCast {
120 bytes: self,
121 _t: PhantomData,
122 };
123 let _assert = &r[..];
124 r
125 }
126
127 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
135 bytemuck::cast_slice(self)
136 }
137
138 pub fn cast_deref_mut<T: bytemuck::AnyBitPattern + bytemuck::NoUninit>(&mut self) -> &mut [T] {
146 bytemuck::cast_slice_mut(self)
147 }
148}
149
150pub struct IpcBytesCast<T: bytemuck::AnyBitPattern> {
154 bytes: IpcBytes,
155 _t: PhantomData<T>,
156}
157impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCast<T> {
158 fn default() -> Self {
159 Self {
160 bytes: Default::default(),
161 _t: PhantomData,
162 }
163 }
164}
165impl<T: bytemuck::AnyBitPattern> ops::Deref for IpcBytesCast<T> {
166 type Target = [T];
167
168 fn deref(&self) -> &Self::Target {
169 bytemuck::cast_slice::<u8, T>(&self.bytes)
170 }
171}
172impl<T: bytemuck::AnyBitPattern> IpcBytesCast<T> {
173 pub fn into_inner(self) -> IpcBytes {
175 self.bytes
176 }
177}
178impl<T: bytemuck::AnyBitPattern> From<IpcBytesCast<T>> for IpcBytes {
179 fn from(value: IpcBytesCast<T>) -> Self {
180 value.bytes
181 }
182}
183impl<T: bytemuck::AnyBitPattern> Clone for IpcBytesCast<T> {
184 fn clone(&self) -> Self {
185 Self {
186 bytes: self.bytes.clone(),
187 _t: PhantomData,
188 }
189 }
190}
191impl<T: bytemuck::AnyBitPattern> fmt::Debug for IpcBytesCast<T> {
192 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193 write!(f, "IpcBytesCast<{}>(<{} items>)", std::any::type_name::<T>(), self.len())
194 }
195}
196impl<T: bytemuck::AnyBitPattern> serde::Serialize for IpcBytesCast<T> {
197 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
198 where
199 S: serde::Serializer,
200 {
201 self.bytes.serialize(serializer)
202 }
203}
204impl<'de, T: bytemuck::AnyBitPattern> serde::Deserialize<'de> for IpcBytesCast<T> {
205 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
206 where
207 D: serde::Deserializer<'de>,
208 {
209 let bytes = IpcBytes::deserialize(deserializer)?;
210 Ok(bytes.cast())
211 }
212}
213impl<T: bytemuck::AnyBitPattern> PartialEq for IpcBytesCast<T> {
214 fn eq(&self, other: &Self) -> bool {
215 self.bytes == other.bytes
216 }
217}
218impl<T: bytemuck::AnyBitPattern> Eq for IpcBytesCast<T> {}
219impl<T: bytemuck::AnyBitPattern + bytemuck::NoUninit> IpcBytesCast<T> {
220 pub async fn from_vec(data: Vec<T>) -> io::Result<Self> {
222 IpcBytes::from_vec(bytemuck::cast_vec(data)).await.map(IpcBytes::cast)
223 }
224
225 pub async fn from_iter(iter: impl Iterator<Item = T>) -> io::Result<Self> {
236 #[cfg(ipc)]
237 {
238 let (min, max) = iter.size_hint();
239 let l = size_of::<T>();
240 let min = min * l;
241 let max = max.map(|m| m * l);
242 if let Some(max) = max {
243 if max <= IpcBytes::INLINE_MAX {
244 return Self::from_vec(iter.collect()).await;
245 } else if max == min {
246 let mut r = IpcBytesMut::new(max).await?;
247 let mut actual_len = 0;
248 for (i, f) in r.chunks_exact_mut(l).zip(iter) {
249 i.copy_from_slice(bytemuck::bytes_of(&f));
250 actual_len += 1;
251 }
252 r.truncate(actual_len * l);
253 return r.finish().await.map(IpcBytes::cast);
254 }
255 }
256
257 let mut writer = IpcBytes::new_writer().await;
258 for f in iter {
259 use futures_lite::AsyncWriteExt as _;
260
261 writer.write_all(bytemuck::bytes_of(&f)).await?;
262 }
263 writer.finish().await.map(IpcBytes::cast)
264 }
265 #[cfg(not(ipc))]
266 {
267 Self::from_vec(iter.collect()).await
268 }
269 }
270
271 pub fn from_vec_blocking(data: Vec<T>) -> io::Result<Self> {
273 IpcBytes::from_vec_blocking(bytemuck::cast_vec(data)).map(IpcBytes::cast)
274 }
275
276 pub fn from_slice_blocking(data: &[T]) -> io::Result<Self> {
278 IpcBytes::from_slice_blocking(bytemuck::cast_slice(data)).map(IpcBytes::cast)
279 }
280
281 pub fn from_iter_blocking(mut iter: impl Iterator<Item = T>) -> io::Result<Self> {
292 #[cfg(ipc)]
293 {
294 let (min, max) = iter.size_hint();
295 let l = size_of::<T>();
296 let min = min * l;
297 let max = max.map(|m| m * l);
298 if let Some(max) = max {
299 if max <= IpcBytes::INLINE_MAX {
300 return Self::from_vec_blocking(iter.collect());
301 } else if max == min {
302 let mut r = IpcBytesMut::new_blocking(max)?;
303 let mut actual_len = 0;
304 for (i, f) in r.chunks_exact_mut(l).zip(&mut iter) {
305 i.copy_from_slice(bytemuck::bytes_of(&f));
306 actual_len += 1;
307 }
308 r.truncate(actual_len * l);
309 return r.finish_blocking().map(IpcBytes::cast);
310 }
311 }
312
313 let mut writer = IpcBytes::new_writer_blocking();
314 for f in iter {
315 use std::io::Write as _;
316
317 writer.write_all(bytemuck::bytes_of(&f))?;
318 }
319 writer.finish().map(IpcBytes::cast)
320 }
321 #[cfg(not(ipc))]
322 {
323 Self::from_vec_blocking(iter.collect())
324 }
325 }
326
327 pub fn as_bytes(&self) -> &IpcBytes {
329 &self.bytes
330 }
331}
332
333impl IpcBytes {
334 pub fn cast<T: bytemuck::AnyBitPattern>(self) -> IpcBytesCast<T> {
344 let r = IpcBytesCast {
345 bytes: self,
346 _t: PhantomData,
347 };
348 let _assert = &r[..];
349 r
350 }
351
352 pub fn cast_deref<T: bytemuck::AnyBitPattern>(&self) -> &[T] {
360 bytemuck::cast_slice(self)
361 }
362}
363
364pub struct IpcBytesCastIntoIter<T: bytemuck::AnyBitPattern>(IpcBytesIntoIter, IpcBytesCast<T>);
366impl<T: bytemuck::AnyBitPattern> IpcBytesCastIntoIter<T> {
367 fn new(bytes: IpcBytesCast<T>) -> Self {
368 Self(bytes.bytes.clone().into_iter(), bytes)
369 }
370
371 pub fn source(&self) -> &IpcBytesCast<T> {
373 &self.1
374 }
375
376 pub fn rest(&self) -> &[T] {
378 bytemuck::cast_slice(self.0.rest())
379 }
380}
381impl<T: bytemuck::AnyBitPattern> Iterator for IpcBytesCastIntoIter<T> {
382 type Item = T;
383
384 fn next(&mut self) -> Option<T> {
385 let size = size_of::<T>();
386 let r = *bytemuck::from_bytes(self.0.rest().get(..size)?);
387 self.0.nth(size - 1);
388 Some(r)
389 }
390
391 fn size_hint(&self) -> (usize, Option<usize>) {
392 let (mut min, mut max) = self.0.size_hint();
393 min /= size_of::<T>();
394 if let Some(max) = &mut max {
395 *max /= size_of::<T>();
396 }
397 (min, max)
398 }
399
400 fn nth(&mut self, n: usize) -> Option<T> {
401 let size = size_of::<T>();
402
403 let byte_skip = n.checked_mul(size)?;
404 let byte_end = byte_skip.checked_add(size)?;
405
406 let bytes = self.0.rest().get(byte_skip..byte_end)?;
407 let r = *bytemuck::from_bytes(bytes);
408
409 self.0.nth(byte_end - 1);
410
411 Some(r)
412 }
413
414 fn last(mut self) -> Option<Self::Item>
415 where
416 Self: Sized,
417 {
418 self.next_back()
419 }
420}
421impl<T: bytemuck::AnyBitPattern> DoubleEndedIterator for IpcBytesCastIntoIter<T> {
422 fn next_back(&mut self) -> Option<T> {
423 let size = size_of::<T>();
424
425 let len = self.0.rest().len();
426 if len < size {
427 return None;
428 }
429
430 let start = len - size;
431 let bytes = &self.0.rest()[start..];
432 let r = *bytemuck::from_bytes(bytes);
433
434 self.0.nth_back(size - 1);
435
436 Some(r)
437 }
438
439 fn nth_back(&mut self, n: usize) -> Option<T> {
440 let size = size_of::<T>();
441
442 let rev_byte_skip = n.checked_mul(size)?;
443 let rev_byte_end = rev_byte_skip.checked_add(size)?;
444 let len = self.0.rest().len();
445
446 if len < rev_byte_end {
447 return None;
448 }
449
450 let start = len - rev_byte_end;
451 let end = len - rev_byte_skip;
452
453 let bytes = &self.0.rest()[start..end];
454 let r = *bytemuck::from_bytes(bytes);
455
456 self.0.nth_back(rev_byte_end - 1);
457
458 Some(r)
459 }
460}
461impl<T: bytemuck::AnyBitPattern> FusedIterator for IpcBytesCastIntoIter<T> {}
462impl<T: bytemuck::AnyBitPattern> Default for IpcBytesCastIntoIter<T> {
463 fn default() -> Self {
464 IpcBytes::empty().cast::<T>().into_iter()
465 }
466}
467impl<T: bytemuck::AnyBitPattern> IntoIterator for IpcBytesCast<T> {
468 type Item = T;
469
470 type IntoIter = IpcBytesCastIntoIter<T>;
471
472 fn into_iter(self) -> Self::IntoIter {
473 IpcBytesCastIntoIter::new(self)
474 }
475}