1use std::{
2 collections::btree_map,
3 fmt::Debug,
4 future::Future,
5 pin::Pin,
6 sync::{atomic, Arc, OnceLock},
7 task::Poll,
8};
9
10use async_lock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
11use bytes::BufMut;
12use futures_lite::{AsyncRead, FutureExt, Stream};
13
14use crate::{Data, IoHandle};
15
16use super::{Chunk, ChunkData, Pos, World};
17
18macro_rules! ready_opt {
20 ($e:expr $(,)?) => {
21 match $e {
22 core::task::Poll::Ready(t) => t,
23 core::task::Poll::Pending => return Some(core::task::Poll::Pending),
24 }
25 };
26}
27
28#[derive(Debug)]
30pub struct Lazy<'w, T, const DIMS: usize, Io: IoHandle> {
31 world: &'w World<T, DIMS, Io>,
32 id: u64,
34
35 dims: OnceLock<[u64; DIMS]>,
38 chunk: Pos<DIMS>,
40
41 method: LoadMethod<'w, T, DIMS>,
42 value: OnceLock<LazyInner<'w, T, DIMS>>,
44}
45
46#[derive(Debug)]
48enum LoadMethod<'w, T, const DIMS: usize> {
49 Mem {
51 chunk: Arc<Chunk<T, DIMS>>,
52 guard: Arc<RwLockReadGuard<'w, ChunkData<T>>>,
53 lock: &'w RwLock<Option<T>>,
54 },
55
56 Io { version: u32, bin: bytes::Bytes },
58}
59
60#[derive(Debug)]
61enum LazyInner<'w, T, const DIMS: usize> {
62 Ref(RefArc<'w, T, DIMS>),
63 RefMut(RefMutArc<'w, T, DIMS>),
64 Direct(T),
65}
66
67#[derive(Debug)]
68struct RefArc<'w, T, const DIMS: usize> {
69 _chunk: Arc<Chunk<T, DIMS>>,
70 _guard: Arc<RwLockReadGuard<'w, ChunkData<T>>>,
71 guard: Option<RwLockReadGuard<'w, Option<T>>>,
72}
73
74#[derive(Debug)]
75struct RefMutArc<'w, T, const DIMS: usize> {
76 _chunk: Arc<Chunk<T, DIMS>>,
77 _guard: Arc<RwLockReadGuard<'w, ChunkData<T>>>,
78 guard: RwLockWriteGuard<'w, Option<T>>,
79}
80
81impl<'w, T: Data, const DIMS: usize, Io: IoHandle> Lazy<'w, T, DIMS, Io> {
82 #[inline]
84 pub fn id(&self) -> u64 {
85 self.id
86 }
87
88 pub(super) fn into_inner(self) -> Option<T> {
89 if let LazyInner::Direct(val) = self.value.into_inner()? {
90 Some(val)
91 } else {
92 None
93 }
94 }
95
96 pub async fn dims(&self) -> crate::Result<&[u64; DIMS]> {
98 if let Some(dims) = self.dims.get() {
99 return Ok(dims);
100 }
101
102 let val = self.get().await?;
103 let mut dims = [0_u64; DIMS];
104 dims[0] = self.id;
105
106 for (index, dim) in dims.iter_mut().enumerate() {
107 if index != 0 {
108 *dim = val.dim(index);
109 }
110 }
111
112 Ok(self.dims.get_or_init(|| dims))
113 }
114
115 pub async fn get(&self) -> crate::Result<&T> {
118 match self.value.get() {
119 Some(LazyInner::Ref(val)) => val
120 .guard
121 .as_deref()
122 .ok_or(crate::Error::ValueNotFound)
123 .and_then(|val| val.as_ref().ok_or(crate::Error::ValueMoved)),
124 Some(LazyInner::RefMut(val)) => val.guard.as_ref().ok_or(crate::Error::ValueMoved),
125 Some(LazyInner::Direct(val)) => Ok(val),
126 None => self.init().await,
127 }
128 }
129
130 pub(super) async fn init(&self) -> crate::Result<&T> {
132 match self.method {
133 LoadMethod::Mem {
134 ref chunk,
135 ref guard,
136 lock,
137 } => {
138 let rg = lock.read().await;
139
140 if let LazyInner::Ref(val) = self.value.get_or_init(|| {
141 LazyInner::Ref({
142 RefArc {
143 _chunk: chunk.clone(),
144 _guard: guard.clone(),
145 guard: Some(rg),
146 }
147 })
148 }) {
149 val.guard
150 .as_deref()
151 .unwrap()
152 .as_ref()
153 .ok_or(crate::Error::ValueMoved)
154 } else {
155 unreachable!()
156 }
157 }
158 LoadMethod::Io { version, ref bin } => {
159 let _ = self.value.set(LazyInner::Direct(
160 T::decode(version, self.dims.get().unwrap(), bin.clone())
161 .map_err(crate::Error::Io)?,
162 ));
163
164 if let Some(LazyInner::Direct(val)) = self.value.get() {
165 Ok(val)
166 } else {
167 unreachable!()
168 }
169 }
170 }
171 }
172
173 pub async fn get_mut(&mut self) -> crate::Result<&mut T> {
179 if let Some(LazyInner::RefMut(val)) = self
180 .value
181 .get_mut()
182 .map::<&'w mut LazyInner<'w, T, DIMS>, _>(|e| unsafe {
184 &mut *(e as *mut LazyInner<'w, T, DIMS>)
185 })
186 {
187 val.guard.as_mut().ok_or(crate::Error::ValueMoved)
188 } else {
189 self.init_mut().await
190 }
191 }
192
193 pub async fn close(self) -> crate::Result<()> {
196 let mut this = self;
197 let (world, old_chunk) = (this.world, this.chunk);
198 let Some(LazyInner::RefMut(guard)) = this.value.get_mut() else {
199 return Ok(());
200 };
201 debug_assert!(guard.guard.is_some());
202 let val = guard.guard.as_mut().unwrap();
203
204 let chunk = world.chunk_buf_of_data_or_load(val).await?;
205 if chunk.pos() != &old_chunk {
206 let val = guard.guard.take().unwrap();
207 let _ = chunk.try_insert(val).await;
208 }
209
210 Ok(())
211 }
212
213 pub(super) async fn init_mut(&mut self) -> crate::Result<&mut T> {
215 match self.method {
216 LoadMethod::Mem {
217 ref chunk,
218 ref guard,
219 lock,
220 } => {
221 if let Some(LazyInner::Ref(val)) = self.value.get_mut() {
222 val.guard = None;
223 }
224
225 if let Some((val, dst)) = self
228 .value
229 .set(LazyInner::RefMut(RefMutArc {
230 _chunk: chunk.clone(),
231 _guard: guard.clone(),
232 guard: lock.write().await,
234 }))
235 .err()
236 .zip(self.value.get_mut())
237 {
238 *dst = val
239 }
240 chunk.writes.fetch_add(1, atomic::Ordering::AcqRel);
241 }
242 LoadMethod::Io { .. } => unsafe {
243 self.load_chunk().await?;
244 },
245 }
246
247 if let Some(LazyInner::RefMut(val)) = self.value.get_mut() {
248 val.guard.as_mut().ok_or(crate::Error::ValueMoved)
249 } else {
250 unreachable!()
251 }
252 }
253
254 pub async fn destroy(self) -> crate::Result<()> {
259 let this = self.get().await?;
260 let id = this.dim(0);
261 let chunk = self.world.chunk_buf_of_data_or_load(this).await?;
262 chunk.remove(id).await;
263
264 Ok(())
265 }
266
267 async unsafe fn load_chunk(&mut self) -> crate::Result<Arc<Chunk<T, DIMS>>> {
270 let chunk = self.world.load_chunk_buf(self.chunk).await;
271 type Guard<'a, T> = RwLockReadGuard<'a, super::ChunkData<T>>;
273
274 let guard: Arc<Guard<'w, T>> = Arc::new(std::mem::transmute(chunk.data.read().await));
276 let lock: &'w RwLock<Option<T>> =
277 std::mem::transmute(guard.get(&self.id).ok_or(crate::Error::ValueNotFound)?);
278
279 if let Some(LazyInner::Ref(val)) = self.value.get_mut() {
280 val.guard = None;
281 }
282
283 if let Some((val, dst)) = self
286 .value
287 .set(LazyInner::RefMut(RefMutArc {
288 _chunk: chunk.clone(),
289 _guard: guard.clone(),
290 guard: lock.write().await,
291 }))
292 .err()
293 .zip(self.value.get_mut())
294 {
295 *dst = val
296 }
297
298 self.method = LoadMethod::Mem {
299 lock,
300 guard,
301 chunk: chunk.clone(),
302 };
303 Ok(chunk)
304 }
305}
306
307type IoReadFuture<'a, Io> = dyn std::future::Future<Output = std::io::Result<(u32, <Io as IoHandle>::Read<'a>)>>
308 + Send
309 + 'a;
310
311enum ChunkFromIoIter<'a, T, const DIMS: usize, Io: IoHandle> {
312 Pre {
313 world: &'a World<T, DIMS, Io>,
314 chunk: [usize; DIMS],
315 future: Pin<Box<IoReadFuture<'a, Io>>>,
316 },
317 InProgress {
318 world: &'a World<T, DIMS, Io>,
319 chunk: [usize; DIMS],
320 read: Io::Read<'a>,
321 version: u32,
322 progress: InProgress<DIMS>,
323 },
324}
325
326impl<T: Debug, const DIMS: usize, Io: IoHandle + Debug> std::fmt::Debug
327 for ChunkFromIoIter<'_, T, DIMS, Io>
328{
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 match self {
331 Self::Pre { world, chunk, .. } => f
332 .debug_struct("Pre")
333 .field("world", world)
334 .field("chunk", chunk)
335 .finish(),
336 Self::InProgress { world, chunk, .. } => f
337 .debug_struct("InProgress")
338 .field("world", world)
339 .field("chunk", chunk)
340 .finish(),
341 }
342 }
343}
344
345enum InProgress<const DIMS: usize> {
353 Dims([U64Buf; DIMS]),
354 Len([u8; 4], [u64; DIMS]),
355 Data(bytes::BytesMut, [u64; DIMS]),
356}
357
358#[derive(Clone, Copy)]
359enum U64Buf {
360 Buf([u8; 8]),
361 Num(u64),
362}
363
364impl Default for U64Buf {
365 #[inline]
366 fn default() -> Self {
367 Self::Buf([0; 8])
368 }
369}
370
371impl<'a, T: Data, const DIMS: usize, Io: IoHandle> ChunkFromIoIter<'a, T, DIMS, Io> {
372 #[allow(clippy::type_complexity)]
373 fn poll_next_inner(
374 &mut self,
375 cx: &mut std::task::Context<'_>,
376 ) -> Option<std::task::Poll<Option<std::io::Result<Lazy<'a, T, DIMS, Io>>>>> {
378 let this = self;
379 match this {
380 ChunkFromIoIter::InProgress {
381 read,
382 progress,
383 chunk,
384 world,
385 version,
386 } => match progress {
387 InProgress::Dims(dims) => {
388 for dim in &mut *dims {
389 if let U64Buf::Buf(buf) = dim {
390 match ready_opt!(Pin::new(&mut *read).poll_read(cx, buf)) {
391 Ok(8) => (),
392 Ok(0) => return Some(Poll::Ready(None)),
393 Ok(actlen) => {
394 return Some(Poll::Ready(Some(Err(std::io::Error::new(
395 std::io::ErrorKind::UnexpectedEof,
396 format!("read {actlen} bytes of length, expected 8 bytes"),
397 )))))
398 }
399 Err(err) => return Some(Poll::Ready(Some(Err(err)))),
400 }
401 *dim = U64Buf::Num(u64::from_be_bytes(*buf));
402 }
403 }
404
405 *progress = InProgress::Len(
406 [0; 4],
407 (*dims).map(|e| {
408 if let U64Buf::Num(num) = e {
409 num
410 } else {
411 unreachable!()
412 }
413 }),
414 );
415
416 None
417 }
418 InProgress::Len(buf, dims) => match ready_opt!(Pin::new(read).poll_read(cx, buf)) {
419 Ok(4) => {
420 let len = u32::from_be_bytes(*buf) as usize;
421 let mut bytes = bytes::BytesMut::with_capacity(len);
422 bytes.put_bytes(0, len);
423 *progress = InProgress::Data(bytes, *dims);
424 None
425 }
426 Ok(len) => Some(Poll::Ready(Some(Err(std::io::Error::new(
427 std::io::ErrorKind::UnexpectedEof,
428 format!("read {len} bytes of length, expected 4 bytes"),
429 ))))),
430 Err(err) => Some(Poll::Ready(Some(Err(err)))),
431 },
432 InProgress::Data(buf, dims) => {
433 let len = buf.len();
434 match ready_opt!(Pin::new(&mut *read).poll_read(cx, buf)) {
435 Ok(len_act) => {
436 if len == len_act {
437 let lock = OnceLock::new();
438 lock.set(*dims).unwrap();
439 let id = dims[0];
440
441 let buf = buf.clone().freeze();
442
443 *progress = InProgress::Dims([U64Buf::Buf([0; 8]); DIMS]);
444
445 Some(Poll::Ready(Some(Ok(Lazy {
446 id,
447 dims: lock,
448 method: LoadMethod::Io {
449 bin: buf,
450 version: *version,
451 },
452 value: OnceLock::new(),
453 chunk: *chunk,
454 world,
455 }))))
456 } else {
457 Some(Poll::Ready(Some(Err(std::io::Error::new(
458 std::io::ErrorKind::UnexpectedEof,
459 format!("read {len_act} bytes of length, expected {len} bytes"),
460 )))))
461 }
462 }
463 Err(err) => Some(Poll::Ready(Some(Err(err)))),
464 }
465 }
466 },
467 ChunkFromIoIter::Pre {
468 future,
469 chunk,
470 world,
471 } => {
472 let (version, read) = match ready_opt!(future.as_mut().poll(cx)) {
473 Ok(val) => val,
474 Err(err) => {
475 return if err.kind() == std::io::ErrorKind::NotFound {
476 Some(Poll::Ready(None))
477 } else {
478 Some(Poll::Ready(Some(Err(err))))
479 }
480 }
481 };
482 *this = Self::InProgress {
483 read,
484 progress: InProgress::Dims([Default::default(); DIMS]),
485 chunk: *chunk,
486 world: *world,
487 version,
488 };
489
490 None
491 }
492 }
493 }
494}
495
496impl<'a, T: Data, const DIMS: usize, Io: IoHandle> Stream for ChunkFromIoIter<'a, T, DIMS, Io> {
497 type Item = std::io::Result<Lazy<'a, T, DIMS, Io>>;
498
499 #[inline]
500 fn poll_next(
501 self: Pin<&mut Self>,
502 cx: &mut std::task::Context<'_>,
503 ) -> std::task::Poll<Option<Self::Item>> {
504 let this = self.get_mut();
505 loop {
506 if let Some(val) = this.poll_next_inner(cx) {
507 return val;
508 }
509 }
510 }
511}
512
513#[derive(Debug)]
514struct ChunkFromMemIter<'a, T, const DIMS: usize, Io: IoHandle> {
515 world: &'a World<T, DIMS, Io>,
516 chunk_pos: [usize; DIMS],
517
518 chunk: Arc<Chunk<T, DIMS>>,
519 guard: Arc<RwLockReadGuard<'a, ChunkData<T>>>,
520
521 map_interact: ChunkFromMemIterMapInteract<'a, T>,
522}
523
524#[derive(Debug)]
525enum ChunkFromMemIterMapInteract<'a, T> {
526 Hint(ArcSliceIter<u64>),
530 Iter(btree_map::Iter<'a, u64, RwLock<Option<T>>>),
531}
532
533#[derive(Debug)]
534struct ArcSliceIter<T> {
535 ptr: usize,
536 data: Arc<[T]>,
537}
538
539impl<T: Copy> Iterator for ArcSliceIter<T> {
540 type Item = T;
541
542 #[inline]
543 fn next(&mut self) -> Option<Self::Item> {
544 if self.ptr < self.data.len() {
545 let val = self.data[self.ptr];
546 self.ptr += 1;
547 Some(val)
548 } else {
549 None
550 }
551 }
552}
553
554impl<'a, T: Data, const DIMS: usize, Io: IoHandle> ChunkFromMemIter<'a, T, DIMS, Io> {
555 fn next_inner(&mut self) -> Option<Option<Lazy<'a, T, DIMS, Io>>> {
556 let (id, lock);
557 match self.map_interact {
558 ChunkFromMemIterMapInteract::Hint(ref mut hint) => {
559 if let Some(id_hint) = hint.next() {
560 if let Some(l) = self.guard.get(&id_hint) {
561 id = id_hint;
562 lock = unsafe {
563 std::mem::transmute(l)
565 };
566 } else {
567 return None;
568 }
569 } else {
570 return Some(None);
571 }
572 }
573 ChunkFromMemIterMapInteract::Iter(ref mut iter) => {
574 let Some((i, l)) = iter.next() else {
575 return Some(None);
576 };
577 id = *i;
578 lock = l;
579 }
580 }
581
582 Some(Some(Lazy {
583 id,
584 chunk: self.chunk_pos,
585 dims: OnceLock::new(),
586 method: LoadMethod::Mem {
587 chunk: self.chunk.clone(),
588 guard: self.guard.clone(),
589 lock,
590 },
591 value: OnceLock::new(),
592 world: self.world,
593 }))
594 }
595}
596
597impl<'a, T: Data, const DIMS: usize, Io: IoHandle> Iterator for ChunkFromMemIter<'a, T, DIMS, Io> {
598 type Item = Lazy<'a, T, DIMS, Io>;
599
600 fn next(&mut self) -> Option<Self::Item> {
601 loop {
602 if let Some(val) = self.next_inner() {
603 return val;
604 }
605 }
606 }
607}
608
609#[derive(Debug)]
612pub struct Iter<'a, T, const DIMS: usize, Io: IoHandle> {
613 world: &'a World<T, DIMS, Io>,
614
615 shape: super::select::ShapeIter<'a, DIMS>,
616 current: Option<ChunkIter<'a, T, DIMS, Io>>,
617
618 hint: Arc<[u64]>,
619}
620
621type ReadLockFut<'a, T> =
622 dyn std::future::Future<Output = async_lock::RwLockReadGuard<'a, super::ChunkData<T>>> + 'a;
623
624enum ChunkIter<'a, T, const DIMS: usize, Io: IoHandle> {
625 Io(ChunkFromIoIter<'a, T, DIMS, Io>),
626 MemReadChunk {
627 map_ref: Arc<Chunk<T, DIMS>>,
628 fut: Pin<Box<ReadLockFut<'a, T>>>,
629 pos: [usize; DIMS],
630 },
631 Mem(ChunkFromMemIter<'a, T, DIMS, Io>),
632}
633
634impl<'a, T, const DIMS: usize, Io: IoHandle> Iter<'a, T, DIMS, Io> {
635 #[inline]
636 pub(super) fn new(
637 world: &'a World<T, DIMS, Io>,
638 shape: super::select::ShapeIter<'a, DIMS>,
639 hint: Arc<[u64]>,
640 ) -> Self {
641 Self {
642 world,
643 shape,
644 current: None,
645 hint,
646 }
647 }
648}
649
650impl<T: Debug, const DIMS: usize, Io: IoHandle + Debug> std::fmt::Debug
651 for ChunkIter<'_, T, DIMS, Io>
652{
653 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
654 match self {
655 Self::Io(arg0) => f.debug_tuple("Io").field(arg0).finish(),
656 Self::MemReadChunk { map_ref, pos, .. } => f
657 .debug_struct("MemReadChunk")
658 .field("map_ref", map_ref)
659 .field("pos", pos)
660 .finish(),
661 Self::Mem(arg0) => f.debug_tuple("Mem").field(arg0).finish(),
662 }
663 }
664}
665
666unsafe impl<'a, R, T, const DIMS: usize, Io> Send for Iter<'a, T, DIMS, Io>
667where
668 R: Send,
669 T: Send,
670 Io: IoHandle<Read<'a> = R>,
671{
672}
673
674unsafe impl<'a, R, T, const DIMS: usize, Io> Sync for Iter<'a, T, DIMS, Io>
675where
676 R: Sync,
677 T: Sync,
678 Io: IoHandle<Read<'a> = R>,
679{
680}
681
682impl<'a, T: Data, const DIMS: usize, Io: IoHandle> Iter<'a, T, DIMS, Io> {
683 #[allow(clippy::type_complexity)]
684 fn poll_next_inner(
685 &mut self,
686 cx: &mut std::task::Context<'_>,
687 ) -> Option<std::task::Poll<Option<crate::Result<Lazy<'a, T, DIMS, Io>>>>> {
689 match self.current {
690 Some(ChunkIter::Io(ref mut iter)) => {
691 if let Some(val) = ready_opt!(Pin::new(iter).poll_next(cx)) {
692 return Some(Poll::Ready(Some(val.map_err(crate::Error::Io))));
693 }
694 }
695 Some(ChunkIter::Mem(ref mut iter)) => {
696 if let Some(val) = iter.next() {
697 return Some(Poll::Ready(Some(Ok(val))));
698 }
699 }
700 Some(ChunkIter::MemReadChunk {
701 ref map_ref,
702 ref mut fut,
703 pos,
704 }) => {
705 let guard = ready_opt!(Pin::new(fut).poll(cx));
706 self.current = Some(ChunkIter::Mem(ChunkFromMemIter {
707 world: self.world,
708 chunk: map_ref.clone(),
709 map_interact: if self.hint.is_empty() {
710 ChunkFromMemIterMapInteract::Iter(unsafe {
711 std::mem::transmute(guard.iter())
713 })
714 } else {
715 ChunkFromMemIterMapInteract::Hint(ArcSliceIter {
716 ptr: 0,
717 data: self.hint.clone(),
718 })
719 },
720 guard: Arc::new(guard),
721 chunk_pos: pos,
722 }));
723 return None;
724 }
725 None => (),
726 }
727
728 if let Some(pos) = self.shape.next() {
729 if let Some(chunk_l) = self.world.chunks_buf.get(&pos) {
730 self.current = Some(ChunkIter::MemReadChunk {
731 fut: unsafe { std::mem::transmute(chunk_l.value().data.read().boxed()) },
733 map_ref: chunk_l.value().clone(),
734 pos,
735 });
736 } else if self.world.io_handle.hint_is_valid(&pos) {
737 self.current = Some(ChunkIter::Io(ChunkFromIoIter::Pre {
738 world: self.world,
739 future: self.world.io_handle.read_chunk(pos).boxed(),
740 chunk: pos,
741 }));
742 } else {
743 self.current = None;
744 return None;
745 }
746
747 None
748 } else {
749 Some(Poll::Ready(None))
750 }
751 }
752}
753
754impl<'a, T: Data, const DIMS: usize, Io: IoHandle> Stream for Iter<'a, T, DIMS, Io> {
755 type Item = crate::Result<Lazy<'a, T, DIMS, Io>>;
756
757 #[inline]
758 fn poll_next(
759 self: std::pin::Pin<&mut Self>,
760 cx: &mut std::task::Context<'_>,
761 ) -> std::task::Poll<Option<Self::Item>> {
762 let this = self.get_mut();
763 loop {
764 if let Some(val) = this.poll_next_inner(cx) {
765 return val;
766 }
767 }
768 }
769}