dmds/world/
iter.rs

1use std::{
2    collections::btree_map,
3    fmt::Debug,
4    future::Future,
5    pin::Pin,
6    sync::{atomic, Arc, OnceLock},
7    task::Poll,
8};
9
10use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
11use bytes::BufMut;
12use futures_lite::{AsyncRead, FutureExt, Stream};
13
14use crate::{Data, IoHandle};
15
16use super::{Chunk, ChunkData, Pos, World};
17
18/// `Some` variant of [`futures_lite::ready`].
19macro_rules! ready_opt {
20    ($e:expr $(,)?) => {
21        match $e {
22            core::task::Poll::Ready(t) => t,
23            core::task::Poll::Pending => return Some(core::task::Poll::Pending),
24        }
25    };
26}
27
28/// A cell loads data in world lazily.
29#[derive(Debug)]
30pub struct Lazy<'w, T, const DIMS: usize, Io: IoHandle> {
31    world: &'w World<T, DIMS, Io>,
32    /// Pre-initialized identifier of this data.
33    id: u64,
34
35    /// Lazy-loaded dimensions of this data.
36    /// This should always be initialized in IO mode.
37    dims: OnceLock<[u64; DIMS]>,
38    /// Position of chunk this data belongs to.
39    chunk: Pos<DIMS>,
40
41    method: LoadMethod<'w, T, DIMS>,
42    /// The inner data.
43    value: OnceLock<LazyInner<'w, T, DIMS>>,
44}
45
46/// Method of loading data.
47#[derive(Debug)]
48enum LoadMethod<'w, T, const DIMS: usize> {
49    /// Load data from buffer pool in `World`.
50    Mem {
51        chunk: Arc<Chunk<T, DIMS>>,
52        guard: Arc<RwLockReadGuard<'w, ChunkData<T>>>,
53        lock: &'w RwLock<Option<T>>,
54    },
55
56    /// Decode data from bytes.
57    Io { version: u32, bin: bytes::Bytes },
58}
59
60#[derive(Debug)]
61enum LazyInner<'w, T, const DIMS: usize> {
62    Ref(RefArc<'w, T, DIMS>),
63    RefMut(RefMutArc<'w, T, DIMS>),
64    Direct(T),
65}
66
67#[derive(Debug)]
68struct RefArc<'w, T, const DIMS: usize> {
69    _chunk: Arc<Chunk<T, DIMS>>,
70    _guard: Arc<RwLockReadGuard<'w, ChunkData<T>>>,
71    guard: Option<RwLockReadGuard<'w, Option<T>>>,
72}
73
74#[derive(Debug)]
75struct RefMutArc<'w, T, const DIMS: usize> {
76    _chunk: Arc<Chunk<T, DIMS>>,
77    _guard: Arc<RwLockReadGuard<'w, ChunkData<T>>>,
78    guard: RwLockWriteGuard<'w, Option<T>>,
79}
80
81impl<'w, T: Data, const DIMS: usize, Io: IoHandle> Lazy<'w, T, DIMS, Io> {
82    /// Gets the value of dimension `0` of this data.
83    #[inline]
84    pub fn id(&self) -> u64 {
85        self.id
86    }
87
88    pub(super) fn into_inner(self) -> Option<T> {
89        if let LazyInner::Direct(val) = self.value.into_inner()? {
90            Some(val)
91        } else {
92            None
93        }
94    }
95
96    /// Gets info of dimensions of the value.
97    pub async fn dims(&self) -> crate::Result<&[u64; DIMS]> {
98        if let Some(dims) = self.dims.get() {
99            return Ok(dims);
100        }
101
102        let val = self.get().await?;
103        let mut dims = [0_u64; DIMS];
104        dims[0] = self.id;
105
106        for (index, dim) in dims.iter_mut().enumerate() {
107            if index != 0 {
108                *dim = val.dim(index);
109            }
110        }
111
112        Ok(self.dims.get_or_init(|| dims))
113    }
114
115    /// Gets the inner value immutably or initialize it
116    /// if it's uninitialized.
117    pub async fn get(&self) -> crate::Result<&T> {
118        match self.value.get() {
119            Some(LazyInner::Ref(val)) => val
120                .guard
121                .as_deref()
122                .ok_or(crate::Error::ValueNotFound)
123                .and_then(|val| val.as_ref().ok_or(crate::Error::ValueMoved)),
124            Some(LazyInner::RefMut(val)) => val.guard.as_ref().ok_or(crate::Error::ValueMoved),
125            Some(LazyInner::Direct(val)) => Ok(val),
126            None => self.init().await,
127        }
128    }
129
130    /// Initialize the inner value immutably.
131    pub(super) async fn init(&self) -> crate::Result<&T> {
132        match self.method {
133            LoadMethod::Mem {
134                ref chunk,
135                ref guard,
136                lock,
137            } => {
138                let rg = lock.read().await;
139
140                if let LazyInner::Ref(val) = self.value.get_or_init(|| {
141                    LazyInner::Ref({
142                        RefArc {
143                            _chunk: chunk.clone(),
144                            _guard: guard.clone(),
145                            guard: Some(rg),
146                        }
147                    })
148                }) {
149                    val.guard
150                        .as_deref()
151                        .unwrap()
152                        .as_ref()
153                        .ok_or(crate::Error::ValueMoved)
154                } else {
155                    unreachable!()
156                }
157            }
158            LoadMethod::Io { version, ref bin } => {
159                let _ = self.value.set(LazyInner::Direct(
160                    T::decode(version, self.dims.get().unwrap(), bin.clone())
161                        .map_err(crate::Error::Io)?,
162                ));
163
164                if let Some(LazyInner::Direct(val)) = self.value.get() {
165                    Ok(val)
166                } else {
167                    unreachable!()
168                }
169            }
170        }
171    }
172
173    /// Gets the inner value mutably or initialize it
174    /// if it's uninitialized.
175    ///
176    /// Make sure to call [`Self::close`] after modification
177    /// related to dimensional values.
178    pub async fn get_mut(&mut self) -> crate::Result<&mut T> {
179        if let Some(LazyInner::RefMut(val)) = self
180            .value
181            .get_mut()
182            // SAFETY: strange issue here, mysterious. Only this way could pass the compilation.
183            .map::<&'w mut LazyInner<'w, T, DIMS>, _>(|e| unsafe {
184                &mut *(e as *mut LazyInner<'w, T, DIMS>)
185            })
186        {
187            val.guard.as_mut().ok_or(crate::Error::ValueMoved)
188        } else {
189            self.init_mut().await
190        }
191    }
192
193    /// Move this data into the chunk it should belongs to
194    /// if this data is not suited in the chunk it belongs to now.
195    pub async fn close(self) -> crate::Result<()> {
196        let mut this = self;
197        let (world, old_chunk) = (this.world, this.chunk);
198        let Some(LazyInner::RefMut(guard)) = this.value.get_mut() else {
199            return Ok(());
200        };
201        debug_assert!(guard.guard.is_some());
202        let val = guard.guard.as_mut().unwrap();
203
204        let chunk = world.chunk_buf_of_data_or_load(val).await?;
205        if chunk.pos() != &old_chunk {
206            let val = guard.guard.take().unwrap();
207            let _ = chunk.try_insert(val).await;
208        }
209
210        Ok(())
211    }
212
213    /// Initialize the inner value mutably.
214    pub(super) async fn init_mut(&mut self) -> crate::Result<&mut T> {
215        match self.method {
216            LoadMethod::Mem {
217                ref chunk,
218                ref guard,
219                lock,
220            } => {
221                if let Some(LazyInner::Ref(val)) = self.value.get_mut() {
222                    val.guard = None;
223                }
224
225                // Set the value with locking behavior,
226                // or override the value directly if value already exists.
227                if let Some((val, dst)) = self
228                    .value
229                    .set(LazyInner::RefMut(RefMutArc {
230                        _chunk: chunk.clone(),
231                        _guard: guard.clone(),
232                        // Obtain the value lock's write guard here
233                        guard: lock.write().await,
234                    }))
235                    .err()
236                    .zip(self.value.get_mut())
237                {
238                    *dst = val
239                }
240                chunk.writes.fetch_add(1, atomic::Ordering::AcqRel);
241            }
242            LoadMethod::Io { .. } => unsafe {
243                self.load_chunk().await?;
244            },
245        }
246
247        if let Some(LazyInner::RefMut(val)) = self.value.get_mut() {
248            val.guard.as_mut().ok_or(crate::Error::ValueMoved)
249        } else {
250            unreachable!()
251        }
252    }
253
254    /// Remove this data from the chunk buffer.
255    ///
256    /// If the chunk buffer does not exist, the chunk will
257    /// be loaded into buffer pool.
258    pub async fn destroy(self) -> crate::Result<()> {
259        let this = self.get().await?;
260        let id = this.dim(0);
261        let chunk = self.world.chunk_buf_of_data_or_load(this).await?;
262        chunk.remove(id).await;
263
264        Ok(())
265    }
266
267    /// Load the chunk buffer this data belongs to to the buffer pool,
268    /// and fill this instance's lazy value with target data in chunk.
269    async unsafe fn load_chunk(&mut self) -> crate::Result<Arc<Chunk<T, DIMS>>> {
270        let chunk = self.world.load_chunk_buf(self.chunk).await;
271        // Guard of a chunk.
272        type Guard<'a, T> = RwLockReadGuard<'a, super::ChunkData<T>>;
273
274        // SAFETY: wrapping lifetime to 'w.
275        let guard: Arc<Guard<'w, T>> = Arc::new(std::mem::transmute(chunk.data.read().await));
276        let lock: &'w RwLock<Option<T>> =
277            std::mem::transmute(guard.get(&self.id).ok_or(crate::Error::ValueNotFound)?);
278
279        if let Some(LazyInner::Ref(val)) = self.value.get_mut() {
280            val.guard = None;
281        }
282
283        // Set the value with locking behavior,
284        // or override the value directly if value already exists.
285        if let Some((val, dst)) = self
286            .value
287            .set(LazyInner::RefMut(RefMutArc {
288                _chunk: chunk.clone(),
289                _guard: guard.clone(),
290                guard: lock.write().await,
291            }))
292            .err()
293            .zip(self.value.get_mut())
294        {
295            *dst = val
296        }
297
298        self.method = LoadMethod::Mem {
299            lock,
300            guard,
301            chunk: chunk.clone(),
302        };
303        Ok(chunk)
304    }
305}
306
307type IoReadFuture<'a, Io> = dyn std::future::Future<Output = std::io::Result<(u32, <Io as IoHandle>::Read<'a>)>>
308    + Send
309    + 'a;
310
311enum ChunkFromIoIter<'a, T, const DIMS: usize, Io: IoHandle> {
312    Pre {
313        world: &'a World<T, DIMS, Io>,
314        chunk: [usize; DIMS],
315        future: Pin<Box<IoReadFuture<'a, Io>>>,
316    },
317    InProgress {
318        world: &'a World<T, DIMS, Io>,
319        chunk: [usize; DIMS],
320        read: Io::Read<'a>,
321        version: u32,
322        progress: InProgress<DIMS>,
323    },
324}
325
326impl<T: Debug, const DIMS: usize, Io: IoHandle + Debug> std::fmt::Debug
327    for ChunkFromIoIter<'_, T, DIMS, Io>
328{
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        match self {
331            Self::Pre { world, chunk, .. } => f
332                .debug_struct("Pre")
333                .field("world", world)
334                .field("chunk", chunk)
335                .finish(),
336            Self::InProgress { world, chunk, .. } => f
337                .debug_struct("InProgress")
338                .field("world", world)
339                .field("chunk", chunk)
340                .finish(),
341        }
342    }
343}
344
345/// Represents the progress of reading a data from bytes.
346///
347/// # Ordering
348///
349/// - Dimensions
350/// - Data length in bytes
351/// - Data bytes
352enum InProgress<const DIMS: usize> {
353    Dims([U64Buf; DIMS]),
354    Len([u8; 4], [u64; DIMS]),
355    Data(bytes::BytesMut, [u64; DIMS]),
356}
357
358#[derive(Clone, Copy)]
359enum U64Buf {
360    Buf([u8; 8]),
361    Num(u64),
362}
363
364impl Default for U64Buf {
365    #[inline]
366    fn default() -> Self {
367        Self::Buf([0; 8])
368    }
369}
370
371impl<'a, T: Data, const DIMS: usize, Io: IoHandle> ChunkFromIoIter<'a, T, DIMS, Io> {
372    #[allow(clippy::type_complexity)]
373    fn poll_next_inner(
374        &mut self,
375        cx: &mut std::task::Context<'_>,
376    ) -> Option<std::task::Poll<Option<std::io::Result<Lazy<'a, T, DIMS, Io>>>>> // None => call this func again.
377    {
378        let this = self;
379        match this {
380            ChunkFromIoIter::InProgress {
381                read,
382                progress,
383                chunk,
384                world,
385                version,
386            } => match progress {
387                InProgress::Dims(dims) => {
388                    for dim in &mut *dims {
389                        if let U64Buf::Buf(buf) = dim {
390                            match ready_opt!(Pin::new(&mut *read).poll_read(cx, buf)) {
391                                Ok(8) => (),
392                                Ok(0) => return Some(Poll::Ready(None)),
393                                Ok(actlen) => {
394                                    return Some(Poll::Ready(Some(Err(std::io::Error::new(
395                                        std::io::ErrorKind::UnexpectedEof,
396                                        format!("read {actlen} bytes of length, expected 8 bytes"),
397                                    )))))
398                                }
399                                Err(err) => return Some(Poll::Ready(Some(Err(err)))),
400                            }
401                            *dim = U64Buf::Num(u64::from_be_bytes(*buf));
402                        }
403                    }
404
405                    *progress = InProgress::Len(
406                        [0; 4],
407                        (*dims).map(|e| {
408                            if let U64Buf::Num(num) = e {
409                                num
410                            } else {
411                                unreachable!()
412                            }
413                        }),
414                    );
415
416                    None
417                }
418                InProgress::Len(buf, dims) => match ready_opt!(Pin::new(read).poll_read(cx, buf)) {
419                    Ok(4) => {
420                        let len = u32::from_be_bytes(*buf) as usize;
421                        let mut bytes = bytes::BytesMut::with_capacity(len);
422                        bytes.put_bytes(0, len);
423                        *progress = InProgress::Data(bytes, *dims);
424                        None
425                    }
426                    Ok(len) => Some(Poll::Ready(Some(Err(std::io::Error::new(
427                        std::io::ErrorKind::UnexpectedEof,
428                        format!("read {len} bytes of length, expected 4 bytes"),
429                    ))))),
430                    Err(err) => Some(Poll::Ready(Some(Err(err)))),
431                },
432                InProgress::Data(buf, dims) => {
433                    let len = buf.len();
434                    match ready_opt!(Pin::new(&mut *read).poll_read(cx, buf)) {
435                        Ok(len_act) => {
436                            if len == len_act {
437                                let lock = OnceLock::new();
438                                lock.set(*dims).unwrap();
439                                let id = dims[0];
440
441                                let buf = buf.clone().freeze();
442
443                                *progress = InProgress::Dims([U64Buf::Buf([0; 8]); DIMS]);
444
445                                Some(Poll::Ready(Some(Ok(Lazy {
446                                    id,
447                                    dims: lock,
448                                    method: LoadMethod::Io {
449                                        bin: buf,
450                                        version: *version,
451                                    },
452                                    value: OnceLock::new(),
453                                    chunk: *chunk,
454                                    world,
455                                }))))
456                            } else {
457                                Some(Poll::Ready(Some(Err(std::io::Error::new(
458                                    std::io::ErrorKind::UnexpectedEof,
459                                    format!("read {len_act} bytes of length, expected {len} bytes"),
460                                )))))
461                            }
462                        }
463                        Err(err) => Some(Poll::Ready(Some(Err(err)))),
464                    }
465                }
466            },
467            ChunkFromIoIter::Pre {
468                future,
469                chunk,
470                world,
471            } => {
472                let (version, read) = match ready_opt!(future.as_mut().poll(cx)) {
473                    Ok(val) => val,
474                    Err(err) => {
475                        return if err.kind() == std::io::ErrorKind::NotFound {
476                            Some(Poll::Ready(None))
477                        } else {
478                            Some(Poll::Ready(Some(Err(err))))
479                        }
480                    }
481                };
482                *this = Self::InProgress {
483                    read,
484                    progress: InProgress::Dims([Default::default(); DIMS]),
485                    chunk: *chunk,
486                    world: *world,
487                    version,
488                };
489
490                None
491            }
492        }
493    }
494}
495
496impl<'a, T: Data, const DIMS: usize, Io: IoHandle> Stream for ChunkFromIoIter<'a, T, DIMS, Io> {
497    type Item = std::io::Result<Lazy<'a, T, DIMS, Io>>;
498
499    #[inline]
500    fn poll_next(
501        self: Pin<&mut Self>,
502        cx: &mut std::task::Context<'_>,
503    ) -> std::task::Poll<Option<Self::Item>> {
504        let this = self.get_mut();
505        loop {
506            if let Some(val) = this.poll_next_inner(cx) {
507                return val;
508            }
509        }
510    }
511}
512
513#[derive(Debug)]
514struct ChunkFromMemIter<'a, T, const DIMS: usize, Io: IoHandle> {
515    world: &'a World<T, DIMS, Io>,
516    chunk_pos: [usize; DIMS],
517
518    chunk: Arc<Chunk<T, DIMS>>,
519    guard: Arc<RwLockReadGuard<'a, ChunkData<T>>>,
520
521    map_interact: ChunkFromMemIterMapInteract<'a, T>,
522}
523
524#[derive(Debug)]
525enum ChunkFromMemIterMapInteract<'a, T> {
526    /// Specify the target of iteration. With the hint,
527    /// the iterator will only iterate these hinted values,
528    /// so that other unneeded values will not be iterated.
529    Hint(ArcSliceIter<u64>),
530    Iter(btree_map::Iter<'a, u64, RwLock<Option<T>>>),
531}
532
533#[derive(Debug)]
534struct ArcSliceIter<T> {
535    ptr: usize,
536    data: Arc<[T]>,
537}
538
539impl<T: Copy> Iterator for ArcSliceIter<T> {
540    type Item = T;
541
542    #[inline]
543    fn next(&mut self) -> Option<Self::Item> {
544        if self.ptr < self.data.len() {
545            let val = self.data[self.ptr];
546            self.ptr += 1;
547            Some(val)
548        } else {
549            None
550        }
551    }
552}
553
554impl<'a, T: Data, const DIMS: usize, Io: IoHandle> ChunkFromMemIter<'a, T, DIMS, Io> {
555    fn next_inner(&mut self) -> Option<Option<Lazy<'a, T, DIMS, Io>>> {
556        let (id, lock);
557        match self.map_interact {
558            ChunkFromMemIterMapInteract::Hint(ref mut hint) => {
559                if let Some(id_hint) = hint.next() {
560                    if let Some(l) = self.guard.get(&id_hint) {
561                        id = id_hint;
562                        lock = unsafe {
563                            //SAFETY: wrapping lifetime
564                            std::mem::transmute(l)
565                        };
566                    } else {
567                        return None;
568                    }
569                } else {
570                    return Some(None);
571                }
572            }
573            ChunkFromMemIterMapInteract::Iter(ref mut iter) => {
574                let Some((i, l)) = iter.next() else {
575                    return Some(None);
576                };
577                id = *i;
578                lock = l;
579            }
580        }
581
582        Some(Some(Lazy {
583            id,
584            chunk: self.chunk_pos,
585            dims: OnceLock::new(),
586            method: LoadMethod::Mem {
587                chunk: self.chunk.clone(),
588                guard: self.guard.clone(),
589                lock,
590            },
591            value: OnceLock::new(),
592            world: self.world,
593        }))
594    }
595}
596
597impl<'a, T: Data, const DIMS: usize, Io: IoHandle> Iterator for ChunkFromMemIter<'a, T, DIMS, Io> {
598    type Item = Lazy<'a, T, DIMS, Io>;
599
600    fn next(&mut self) -> Option<Self::Item> {
601        loop {
602            if let Some(val) = self.next_inner() {
603                return val;
604            }
605        }
606    }
607}
608
609/// An async iterator (namely stream) that iterates over a selection
610/// of chunks.
611#[derive(Debug)]
612pub struct Iter<'a, T, const DIMS: usize, Io: IoHandle> {
613    world: &'a World<T, DIMS, Io>,
614
615    shape: super::select::ShapeIter<'a, DIMS>,
616    current: Option<ChunkIter<'a, T, DIMS, Io>>,
617
618    hint: Arc<[u64]>,
619}
620
621type ReadLockFut<'a, T> =
622    dyn std::future::Future<Output = async_lock::RwLockReadGuard<'a, super::ChunkData<T>>> + 'a;
623
624enum ChunkIter<'a, T, const DIMS: usize, Io: IoHandle> {
625    Io(ChunkFromIoIter<'a, T, DIMS, Io>),
626    MemReadChunk {
627        map_ref: Arc<Chunk<T, DIMS>>,
628        fut: Pin<Box<ReadLockFut<'a, T>>>,
629        pos: [usize; DIMS],
630    },
631    Mem(ChunkFromMemIter<'a, T, DIMS, Io>),
632}
633
634impl<'a, T, const DIMS: usize, Io: IoHandle> Iter<'a, T, DIMS, Io> {
635    #[inline]
636    pub(super) fn new(
637        world: &'a World<T, DIMS, Io>,
638        shape: super::select::ShapeIter<'a, DIMS>,
639        hint: Arc<[u64]>,
640    ) -> Self {
641        Self {
642            world,
643            shape,
644            current: None,
645            hint,
646        }
647    }
648}
649
650impl<T: Debug, const DIMS: usize, Io: IoHandle + Debug> std::fmt::Debug
651    for ChunkIter<'_, T, DIMS, Io>
652{
653    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654        match self {
655            Self::Io(arg0) => f.debug_tuple("Io").field(arg0).finish(),
656            Self::MemReadChunk { map_ref, pos, .. } => f
657                .debug_struct("MemReadChunk")
658                .field("map_ref", map_ref)
659                .field("pos", pos)
660                .finish(),
661            Self::Mem(arg0) => f.debug_tuple("Mem").field(arg0).finish(),
662        }
663    }
664}
665
666unsafe impl<'a, R, T, const DIMS: usize, Io> Send for Iter<'a, T, DIMS, Io>
667where
668    R: Send,
669    T: Send,
670    Io: IoHandle<Read<'a> = R>,
671{
672}
673
674unsafe impl<'a, R, T, const DIMS: usize, Io> Sync for Iter<'a, T, DIMS, Io>
675where
676    R: Sync,
677    T: Sync,
678    Io: IoHandle<Read<'a> = R>,
679{
680}
681
682impl<'a, T: Data, const DIMS: usize, Io: IoHandle> Iter<'a, T, DIMS, Io> {
683    #[allow(clippy::type_complexity)]
684    fn poll_next_inner(
685        &mut self,
686        cx: &mut std::task::Context<'_>,
687    ) -> Option<std::task::Poll<Option<crate::Result<Lazy<'a, T, DIMS, Io>>>>> // None => call this func again.
688    {
689        match self.current {
690            Some(ChunkIter::Io(ref mut iter)) => {
691                if let Some(val) = ready_opt!(Pin::new(iter).poll_next(cx)) {
692                    return Some(Poll::Ready(Some(val.map_err(crate::Error::Io))));
693                }
694            }
695            Some(ChunkIter::Mem(ref mut iter)) => {
696                if let Some(val) = iter.next() {
697                    return Some(Poll::Ready(Some(Ok(val))));
698                }
699            }
700            Some(ChunkIter::MemReadChunk {
701                ref map_ref,
702                ref mut fut,
703                pos,
704            }) => {
705                let guard = ready_opt!(Pin::new(fut).poll(cx));
706                self.current = Some(ChunkIter::Mem(ChunkFromMemIter {
707                    world: self.world,
708                    chunk: map_ref.clone(),
709                    map_interact: if self.hint.is_empty() {
710                        ChunkFromMemIterMapInteract::Iter(unsafe {
711                            //SAFETY: wrapping lifetime
712                            std::mem::transmute(guard.iter())
713                        })
714                    } else {
715                        ChunkFromMemIterMapInteract::Hint(ArcSliceIter {
716                            ptr: 0,
717                            data: self.hint.clone(),
718                        })
719                    },
720                    guard: Arc::new(guard),
721                    chunk_pos: pos,
722                }));
723                return None;
724            }
725            None => (),
726        }
727
728        if let Some(pos) = self.shape.next() {
729            if let Some(chunk_l) = self.world.chunks_buf.get(&pos) {
730                self.current = Some(ChunkIter::MemReadChunk {
731                    // SAFETY: wrapping lifetime
732                    fut: unsafe { std::mem::transmute(chunk_l.value().data.read().boxed()) },
733                    map_ref: chunk_l.value().clone(),
734                    pos,
735                });
736            } else if self.world.io_handle.hint_is_valid(&pos) {
737                self.current = Some(ChunkIter::Io(ChunkFromIoIter::Pre {
738                    world: self.world,
739                    future: self.world.io_handle.read_chunk(pos).boxed(),
740                    chunk: pos,
741                }));
742            } else {
743                self.current = None;
744                return None;
745            }
746
747            None
748        } else {
749            Some(Poll::Ready(None))
750        }
751    }
752}
753
754impl<'a, T: Data, const DIMS: usize, Io: IoHandle> Stream for Iter<'a, T, DIMS, Io> {
755    type Item = crate::Result<Lazy<'a, T, DIMS, Io>>;
756
757    #[inline]
758    fn poll_next(
759        self: std::pin::Pin<&mut Self>,
760        cx: &mut std::task::Context<'_>,
761    ) -> std::task::Poll<Option<Self::Item>> {
762        let this = self.get_mut();
763        loop {
764            if let Some(val) = this.poll_next_inner(cx) {
765                return val;
766            }
767        }
768    }
769}