1use std::any::Any;
7use std::collections::HashMap;
8use std::fmt::Display;
9use std::hash::Hash;
10use std::io::Result;
11use std::sync::{Arc, Condvar, Mutex, WaitTimeoutResult};
12use std::time::Duration;
13
14use crate::cache::state::{BlobRangeMap, ChunkIndexGetter, ChunkMap, IndexedChunkMap, RangeMap};
15use crate::cache::SINGLE_INFLIGHT_WAIT_TIMEOUT;
16use crate::device::BlobChunkInfo;
17use crate::{StorageError, StorageResult};
18
19#[derive(PartialEq, Copy, Clone)]
20enum Status {
21 Inflight,
22 Complete,
23}
24
25struct Slot {
26 state: Mutex<Status>,
27 condvar: Condvar,
28}
29
30impl Slot {
31 fn new() -> Self {
32 Slot {
33 state: Mutex::new(Status::Inflight),
34 condvar: Condvar::new(),
35 }
36 }
37
38 fn notify(&self) {
39 self.condvar.notify_all();
40 }
41
42 fn done(&self) {
43 *self.state.lock().unwrap() = Status::Complete;
45 self.notify();
46 }
47
48 fn wait_for_inflight(&self, timeout: Duration) -> StorageResult<Status> {
49 let mut state = self.state.lock().unwrap();
50 let mut tor: WaitTimeoutResult;
51
52 while *state == Status::Inflight {
53 let r = self.condvar.wait_timeout(state, timeout).unwrap();
55 state = r.0;
56 tor = r.1;
57 if tor.timed_out() {
58 return Err(StorageError::Timeout);
59 }
60 }
61
62 Ok(*state)
63 }
64}
65
66pub struct BlobStateMap<C, I> {
74 c: C,
75 inflight_tracer: Mutex<HashMap<I, Arc<Slot>>>,
76}
77
78impl<C, I> From<C> for BlobStateMap<C, I>
79where
80 C: ChunkMap + ChunkIndexGetter<Index = I>,
81 I: Eq + Hash + Display,
82{
83 fn from(c: C) -> Self {
84 Self {
85 c,
86 inflight_tracer: Mutex::new(HashMap::new()),
87 }
88 }
89}
90
91impl<C, I> ChunkMap for BlobStateMap<C, I>
92where
93 C: ChunkMap + ChunkIndexGetter<Index = I>,
94 I: Eq + Hash + Display + Send + 'static,
95{
96 fn is_ready(&self, chunk: &dyn BlobChunkInfo) -> Result<bool> {
97 self.c.is_ready(chunk)
98 }
99
100 fn is_pending(&self, chunk: &dyn BlobChunkInfo) -> Result<bool> {
101 let index = C::get_index(chunk);
102 Ok(self.inflight_tracer.lock().unwrap().get(&index).is_some())
103 }
104
105 fn check_ready_and_mark_pending(&self, chunk: &dyn BlobChunkInfo) -> StorageResult<bool> {
106 let mut ready = self.c.is_ready(chunk).map_err(StorageError::CacheIndex)?;
107
108 if ready {
109 return Ok(true);
110 }
111
112 let index = C::get_index(chunk);
113 let mut guard = self.inflight_tracer.lock().unwrap();
114
115 if let Some(i) = guard.get(&index).cloned() {
116 drop(guard);
117 let result = i.wait_for_inflight(Duration::from_millis(SINGLE_INFLIGHT_WAIT_TIMEOUT));
118 if let Err(StorageError::Timeout) = result {
119 warn!(
120 "Waiting for backend IO expires. chunk index {}, compressed offset {}",
121 index,
122 chunk.compressed_offset()
123 );
124
125 Err(StorageError::Timeout)
126 } else {
127 self.check_ready_and_mark_pending(chunk)
130 }
131 } else {
132 if self.c.is_ready(chunk).map_err(StorageError::CacheIndex)? {
135 ready = true;
136 } else {
137 guard.insert(index, Arc::new(Slot::new()));
138 }
139 Ok(ready)
140 }
141 }
142
143 fn set_ready_and_clear_pending(&self, chunk: &dyn BlobChunkInfo) -> Result<()> {
144 let res = self.c.set_ready_and_clear_pending(chunk);
145 self.clear_pending(chunk);
146 res
147 }
148
149 fn clear_pending(&self, chunk: &dyn BlobChunkInfo) {
150 let index = C::get_index(chunk);
151 let mut guard = self.inflight_tracer.lock().unwrap();
152 if let Some(i) = guard.remove(&index) {
153 i.done();
154 }
155 }
156
157 fn is_persist(&self) -> bool {
158 self.c.is_persist()
159 }
160
161 fn as_range_map(&self) -> Option<&dyn RangeMap<I = u32>> {
162 let any = self as &dyn Any;
163
164 any.downcast_ref::<BlobStateMap<IndexedChunkMap, u32>>()
165 .map(|v| v as &dyn RangeMap<I = u32>)
166 }
167}
168
169impl RangeMap for BlobStateMap<IndexedChunkMap, u32> {
170 type I = u32;
171
172 fn is_range_all_ready(&self) -> bool {
173 self.c.is_range_all_ready()
174 }
175
176 fn is_range_ready(&self, start: Self::I, count: Self::I) -> Result<bool> {
177 self.c.is_range_ready(start, count)
178 }
179
180 fn check_range_ready_and_mark_pending(
181 &self,
182 start: Self::I,
183 count: Self::I,
184 ) -> Result<Option<Vec<Self::I>>> {
185 let pending = match self.c.check_range_ready_and_mark_pending(start, count) {
186 Err(e) => return Err(e),
187 Ok(None) => return Ok(None),
188 Ok(Some(v)) => {
189 if v.is_empty() {
190 return Ok(None);
191 }
192 v
193 }
194 };
195
196 let mut res = Vec::with_capacity(pending.len());
197 let mut guard = self.inflight_tracer.lock().unwrap();
198 for index in pending.iter() {
199 if guard.get(index).is_none() {
200 if !self.c.is_range_ready(*index, 1)? {
203 guard.insert(*index, Arc::new(Slot::new()));
204 res.push(*index);
205 }
206 }
207 }
208
209 Ok(Some(res))
210 }
211
212 fn set_range_ready_and_clear_pending(&self, start: Self::I, count: Self::I) -> Result<()> {
213 let res = self.c.set_range_ready_and_clear_pending(start, count);
214 self.clear_range_pending(start, count);
215 res
216 }
217
218 fn clear_range_pending(&self, start: Self::I, count: Self::I) {
219 let count = std::cmp::min(count, u32::MAX - start);
220 let end = start + count;
221 let mut guard = self.inflight_tracer.lock().unwrap();
222
223 for index in start..end {
224 if let Some(i) = guard.remove(&index) {
225 i.done();
226 }
227 }
228 }
229
230 fn wait_for_range_ready(&self, start: Self::I, count: Self::I) -> Result<bool> {
231 let count = std::cmp::min(count, u32::MAX - start);
232 let end = start + count;
233 if self.is_range_ready(start, count)? {
234 return Ok(true);
235 }
236
237 let mut guard = self.inflight_tracer.lock().unwrap();
238 for index in start..end {
239 if let Some(i) = guard.get(&index).cloned() {
240 drop(guard);
241 let result =
242 i.wait_for_inflight(Duration::from_millis(SINGLE_INFLIGHT_WAIT_TIMEOUT));
243 if let Err(StorageError::Timeout) = result {
244 warn!(
245 "Waiting for range backend IO expires. chunk index {}. range[{}, {}]",
246 index, start, count
247 );
248 break;
249 };
250 if !self.c.is_range_ready(index, 1)? {
251 return Ok(false);
252 }
253 guard = self.inflight_tracer.lock().unwrap();
254 }
255 }
256
257 self.is_range_ready(start, count)
258 }
259}
260
261impl RangeMap for BlobStateMap<BlobRangeMap, u64> {
262 type I = u64;
263
264 fn is_range_all_ready(&self) -> bool {
265 self.c.is_range_all_ready()
266 }
267
268 fn is_range_ready(&self, start: Self::I, count: Self::I) -> Result<bool> {
269 self.c.is_range_ready(start, count)
270 }
271
272 fn check_range_ready_and_mark_pending(
273 &self,
274 start: Self::I,
275 count: Self::I,
276 ) -> Result<Option<Vec<Self::I>>> {
277 let pending = match self.c.check_range_ready_and_mark_pending(start, count) {
278 Err(e) => return Err(e),
279 Ok(None) => return Ok(None),
280 Ok(Some(v)) => {
281 if v.is_empty() {
282 return Ok(None);
283 }
284 v
285 }
286 };
287
288 let mut res = Vec::with_capacity(pending.len());
289 let mut guard = self.inflight_tracer.lock().unwrap();
290 for index in pending.iter() {
291 if guard.get(index).is_none() {
292 if !self.c.is_range_ready(*index, 1)? {
295 guard.insert(*index, Arc::new(Slot::new()));
296 res.push(*index);
297 }
298 }
299 }
300
301 Ok(Some(res))
302 }
303
304 fn set_range_ready_and_clear_pending(&self, start: Self::I, count: Self::I) -> Result<()> {
305 let res = self.c.set_range_ready_and_clear_pending(start, count);
306 self.clear_range_pending(start, count);
307 res
308 }
309
310 fn clear_range_pending(&self, start: Self::I, count: Self::I) {
311 let (start_index, end_index) = match self.c.get_range(start, count) {
312 Ok(v) => v,
313 Err(_) => {
314 debug_assert!(false);
315 return;
316 }
317 };
318
319 let mut guard = self.inflight_tracer.lock().unwrap();
320 for index in start_index..end_index {
321 let idx = (index as u64) << self.c.shift;
322 if let Some(i) = guard.remove(&idx) {
323 i.done();
324 }
325 }
326 }
327
328 fn wait_for_range_ready(&self, start: Self::I, count: Self::I) -> Result<bool> {
329 if self.c.is_range_ready(start, count)? {
330 return Ok(true);
331 }
332
333 let (start_index, end_index) = self.c.get_range(start, count)?;
334 let mut guard = self.inflight_tracer.lock().unwrap();
335 for index in start_index..end_index {
336 let idx = (index as u64) << self.c.shift;
337 if let Some(i) = guard.get(&idx).cloned() {
338 drop(guard);
339 let result =
340 i.wait_for_inflight(Duration::from_millis(SINGLE_INFLIGHT_WAIT_TIMEOUT));
341 if let Err(StorageError::Timeout) = result {
342 warn!(
343 "Waiting for range backend IO expires. chunk index {}. range[{}, {}]",
344 index, start, count
345 );
346 break;
347 };
348 if !self.c.is_range_ready(idx, 1)? {
349 return Ok(false);
350 }
351 guard = self.inflight_tracer.lock().unwrap();
352 }
353 }
354
355 self.c.is_range_ready(start, count)
356 }
357}
358
359impl BlobStateMap<BlobRangeMap, u64> {
360 pub fn from_range_map(map: BlobRangeMap) -> Self {
362 Self {
363 c: map,
364 inflight_tracer: Mutex::new(HashMap::new()),
365 }
366 }
367}
368
369#[cfg(test)]
370pub(crate) mod tests {
371 use std::sync::Arc;
372 use std::thread;
373 use std::time::Instant;
374
375 use nydus_utils::digest::Algorithm::Blake3;
376 use nydus_utils::digest::{Algorithm, RafsDigest};
377 use vmm_sys_util::tempdir::TempDir;
378 use vmm_sys_util::tempfile::TempFile;
379
380 use super::*;
381 use crate::cache::state::DigestedChunkMap;
382 use crate::device::BlobChunkInfo;
383 use crate::test::MockChunkInfo;
384
385 struct Chunk {
386 index: u32,
387 digest: RafsDigest,
388 }
389
390 impl Chunk {
391 fn new(index: u32) -> Arc<Self> {
392 Arc::new(Self {
393 index,
394 digest: RafsDigest::from_buf(
395 unsafe { std::slice::from_raw_parts(&index as *const u32 as *const u8, 4) },
396 Algorithm::Blake3,
397 ),
398 })
399 }
400 }
401
402 impl BlobChunkInfo for Chunk {
403 fn chunk_id(&self) -> &RafsDigest {
404 &self.digest
405 }
406
407 fn id(&self) -> u32 {
408 self.index
409 }
410
411 fn blob_index(&self) -> u32 {
412 0
413 }
414
415 fn compressed_offset(&self) -> u64 {
416 unimplemented!();
417 }
418
419 fn compressed_size(&self) -> u32 {
420 unimplemented!();
421 }
422
423 fn uncompressed_offset(&self) -> u64 {
424 unimplemented!();
425 }
426
427 fn uncompressed_size(&self) -> u32 {
428 unimplemented!();
429 }
430
431 fn is_batch(&self) -> bool {
432 unimplemented!();
433 }
434
435 fn is_compressed(&self) -> bool {
436 unimplemented!();
437 }
438
439 fn is_encrypted(&self) -> bool {
440 false
441 }
442
443 fn has_crc32(&self) -> bool {
444 unimplemented!();
445 }
446
447 fn crc32(&self) -> u32 {
448 unimplemented!();
449 }
450
451 fn as_any(&self) -> &dyn Any {
452 self
453 }
454 }
455
456 #[test]
457 fn test_chunk_map() {
458 let dir = TempDir::new().unwrap();
459 let blob_path = dir.as_path().join("blob-1");
460 let blob_path = blob_path.as_os_str().to_str().unwrap().to_string();
461 let chunk_count = 1000000;
462 let skip_index = 77;
463
464 let indexed_chunk_map1 = Arc::new(BlobStateMap::from(
465 IndexedChunkMap::new(&blob_path, chunk_count, true).unwrap(),
466 ));
467 let indexed_chunk_map2 = Arc::new(BlobStateMap::from(
468 IndexedChunkMap::new(&blob_path, chunk_count, true).unwrap(),
469 ));
470 let indexed_chunk_map3 = Arc::new(BlobStateMap::from(
471 IndexedChunkMap::new(&blob_path, chunk_count, true).unwrap(),
472 ));
473
474 let now = Instant::now();
475
476 let h1 = thread::spawn(move || {
477 for idx in 0..chunk_count {
478 let chunk = Chunk::new(idx);
479 if idx % skip_index != 0 {
480 indexed_chunk_map1
481 .set_ready_and_clear_pending(chunk.as_ref())
482 .unwrap();
483 }
484 }
485 });
486
487 let h2 = thread::spawn(move || {
488 for idx in 0..chunk_count {
489 let chunk = Chunk::new(idx);
490 if idx % skip_index != 0 {
491 indexed_chunk_map2
492 .set_ready_and_clear_pending(chunk.as_ref())
493 .unwrap();
494 }
495 }
496 });
497
498 h1.join()
499 .map_err(|e| {
500 error!("Join error {:?}", e);
501 e
502 })
503 .unwrap();
504 h2.join()
505 .map_err(|e| {
506 error!("Join error {:?}", e);
507 e
508 })
509 .unwrap();
510
511 println!(
512 "IndexedChunkMap Concurrency: {}ms",
513 now.elapsed().as_millis()
514 );
515
516 for idx in 0..chunk_count {
517 let chunk = Chunk::new(idx);
518
519 let has_ready = indexed_chunk_map3
520 .check_ready_and_mark_pending(chunk.as_ref())
521 .unwrap();
522 if idx % skip_index == 0 {
523 if has_ready {
524 panic!("indexed chunk map: index {} shouldn't be ready", idx);
525 }
526 } else if !has_ready {
527 panic!("indexed chunk map: index {} should be ready", idx);
528 }
529 }
530 }
531
532 fn iterate(chunks: &[Arc<Chunk>], chunk_map: &dyn ChunkMap, chunk_count: u32) {
533 for idx in 0..chunk_count {
534 chunk_map
535 .set_ready_and_clear_pending(chunks[idx as usize].as_ref())
536 .unwrap();
537 }
538 for idx in 0..chunk_count {
539 assert!(chunk_map
540 .check_ready_and_mark_pending(chunks[idx as usize].as_ref())
541 .unwrap(),);
542 }
543 }
544
545 #[test]
546 fn test_chunk_map_perf() {
547 let dir = TempDir::new().unwrap();
548 let blob_path = dir.as_path().join("blob-1");
549 let blob_path = blob_path.as_os_str().to_str().unwrap().to_string();
550 let chunk_count = 1000000;
551
552 let mut chunks = Vec::new();
553 for idx in 0..chunk_count {
554 chunks.push(Chunk::new(idx))
555 }
556
557 let indexed_chunk_map =
558 BlobStateMap::from(IndexedChunkMap::new(&blob_path, chunk_count, true).unwrap());
559 let now = Instant::now();
560 iterate(&chunks, &indexed_chunk_map as &dyn ChunkMap, chunk_count);
561 let elapsed1 = now.elapsed().as_millis();
562
563 let digested_chunk_map = BlobStateMap::from(DigestedChunkMap::new());
564 let now = Instant::now();
565 iterate(&chunks, &digested_chunk_map as &dyn ChunkMap, chunk_count);
566 let elapsed2 = now.elapsed().as_millis();
567
568 println!(
569 "IndexedChunkMap vs DigestedChunkMap: {}ms vs {}ms",
570 elapsed1, elapsed2
571 );
572 }
573
574 #[test]
575 fn test_inflight_tracer() {
576 let chunk_1: Arc<dyn BlobChunkInfo> = Arc::new({
577 let mut c = MockChunkInfo::new();
578 c.index = 1;
579 c.block_id = RafsDigest::from_buf("hello world".as_bytes(), Blake3);
580 c
581 });
582 let chunk_2: Arc<dyn BlobChunkInfo> = Arc::new({
583 let mut c = MockChunkInfo::new();
584 c.index = 2;
585 c.block_id = RafsDigest::from_buf("hello world 2".as_bytes(), Blake3);
586 c
587 });
588 let tmp_file = TempFile::new().unwrap();
590 let index_map = Arc::new(BlobStateMap::from(
591 IndexedChunkMap::new(tmp_file.as_path().to_str().unwrap(), 10, true).unwrap(),
592 ));
593 index_map
594 .check_ready_and_mark_pending(chunk_1.as_ref())
595 .unwrap();
596 assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 1);
597 index_map
598 .check_ready_and_mark_pending(chunk_2.as_ref())
599 .unwrap();
600 assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 2);
601 index_map
602 .check_ready_and_mark_pending(chunk_1.as_ref())
603 .unwrap_err();
604 index_map
605 .check_ready_and_mark_pending(chunk_2.as_ref())
606 .unwrap_err();
607 assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 2);
608
609 index_map
610 .set_ready_and_clear_pending(chunk_1.as_ref())
611 .unwrap();
612 assert!(index_map
613 .check_ready_and_mark_pending(chunk_1.as_ref())
614 .unwrap(),);
615 assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 1);
616
617 index_map.clear_pending(chunk_2.as_ref());
618 assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 0);
619 assert!(!index_map
620 .check_ready_and_mark_pending(chunk_2.as_ref())
621 .unwrap(),);
622 assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 1);
623 index_map.clear_pending(chunk_2.as_ref());
624 assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 0);
625 index_map
626 .set_ready_and_clear_pending(chunk_2.as_ref())
627 .unwrap();
628 assert!(index_map
629 .check_ready_and_mark_pending(chunk_2.as_ref())
630 .unwrap(),);
631 assert_eq!(index_map.inflight_tracer.lock().unwrap().len(), 0);
632
633 let digest_map = Arc::new(BlobStateMap::from(DigestedChunkMap::new()));
635 digest_map
636 .check_ready_and_mark_pending(chunk_1.as_ref())
637 .unwrap();
638 assert_eq!(digest_map.inflight_tracer.lock().unwrap().len(), 1);
639 digest_map
640 .check_ready_and_mark_pending(chunk_2.as_ref())
641 .unwrap();
642 assert_eq!(digest_map.inflight_tracer.lock().unwrap().len(), 2);
643 digest_map
644 .check_ready_and_mark_pending(chunk_1.as_ref())
645 .unwrap_err();
646 digest_map
647 .check_ready_and_mark_pending(chunk_2.as_ref())
648 .unwrap_err();
649 digest_map
650 .set_ready_and_clear_pending(chunk_1.as_ref())
651 .unwrap();
652 assert!(digest_map
653 .check_ready_and_mark_pending(chunk_1.as_ref())
654 .unwrap(),);
655 digest_map.clear_pending(chunk_2.as_ref());
656 assert!(!digest_map
657 .check_ready_and_mark_pending(chunk_2.as_ref())
658 .unwrap(),);
659 digest_map.clear_pending(chunk_2.as_ref());
660 assert_eq!(digest_map.inflight_tracer.lock().unwrap().len(), 0);
661 }
662
663 #[test]
664 fn test_inflight_tracer_race() {
665 let tmp_file = TempFile::new().unwrap();
666 let map = Arc::new(BlobStateMap::from(
667 IndexedChunkMap::new(tmp_file.as_path().to_str().unwrap(), 10, true).unwrap(),
668 ));
669
670 let chunk_4: Arc<dyn BlobChunkInfo> = Arc::new({
671 let mut c = MockChunkInfo::new();
672 c.index = 4;
673 c
674 });
675
676 assert!(!map
677 .as_ref()
678 .check_ready_and_mark_pending(chunk_4.as_ref())
679 .unwrap(),);
680 let map_cloned = map.clone();
681 assert_eq!(map.inflight_tracer.lock().unwrap().len(), 1);
682
683 let chunk_4_cloned = chunk_4.clone();
684 let t1 = thread::Builder::new()
685 .spawn(move || {
686 for _ in 0..4 {
687 let ready = map_cloned
688 .check_ready_and_mark_pending(chunk_4_cloned.as_ref())
689 .unwrap();
690 assert!(ready);
691 }
692 })
693 .unwrap();
694
695 let map_cloned_2 = map.clone();
696 let chunk_4_cloned_2 = chunk_4.clone();
697 let t2 = thread::Builder::new()
698 .spawn(move || {
699 for _ in 0..2 {
700 let ready = map_cloned_2
701 .check_ready_and_mark_pending(chunk_4_cloned_2.as_ref())
702 .unwrap();
703 assert!(ready);
704 }
705 })
706 .unwrap();
707
708 thread::sleep(Duration::from_secs(1));
709
710 map.set_ready_and_clear_pending(chunk_4.as_ref()).unwrap();
711
712 map.set_ready_and_clear_pending(chunk_4.as_ref()).unwrap();
714 map.set_ready_and_clear_pending(chunk_4.as_ref()).unwrap();
715
716 assert_eq!(map.inflight_tracer.lock().unwrap().len(), 0);
717
718 t1.join().unwrap();
719 t2.join().unwrap();
720 }
721
722 #[test]
723 fn test_inflight_tracer_timeout() {
731 let tmp_file = TempFile::new().unwrap();
732 let map = Arc::new(BlobStateMap::from(
733 IndexedChunkMap::new(tmp_file.as_path().to_str().unwrap(), 10, true).unwrap(),
734 ));
735
736 let chunk_4: Arc<dyn BlobChunkInfo> = Arc::new({
737 let mut c = MockChunkInfo::new();
738 c.index = 4;
739 c
740 });
741
742 map.as_ref()
743 .check_ready_and_mark_pending(chunk_4.as_ref())
744 .unwrap();
745 let map_cloned = map.clone();
746
747 assert_eq!(map.inflight_tracer.lock().unwrap().len(), 1);
748
749 let chunk_4_cloned = chunk_4.clone();
750 let t1 = thread::Builder::new()
751 .spawn(move || {
752 for _ in 0..4 {
753 map_cloned
754 .check_ready_and_mark_pending(chunk_4_cloned.as_ref())
755 .unwrap_err();
756 }
757 })
758 .unwrap();
759
760 t1.join().unwrap();
761
762 assert_eq!(map.inflight_tracer.lock().unwrap().len(), 1);
763
764 map.as_ref()
765 .check_ready_and_mark_pending(chunk_4.as_ref())
766 .unwrap_err();
767 assert_eq!(map.inflight_tracer.lock().unwrap().len(), 1);
768
769 map.clear_pending(chunk_4.as_ref());
770 assert_eq!(map.inflight_tracer.lock().unwrap().len(), 0);
771 }
772
773 #[test]
774 fn test_inflight_tracer_race_range() {
775 let tmp_file = TempFile::new().unwrap();
776 let map = Arc::new(BlobStateMap::from(
777 IndexedChunkMap::new(tmp_file.as_path().to_str().unwrap(), 10, true).unwrap(),
778 ));
779
780 assert!(!map.is_range_all_ready());
781 assert!(!map.is_range_ready(0, 1).unwrap());
782 assert!(!map.is_range_ready(9, 1).unwrap());
783 assert!(map.is_range_ready(10, 1).is_err());
784 assert_eq!(
785 map.check_range_ready_and_mark_pending(0, 2).unwrap(),
786 Some(vec![0, 1])
787 );
788 map.set_range_ready_and_clear_pending(0, 2).unwrap();
789 assert_eq!(map.check_range_ready_and_mark_pending(0, 2).unwrap(), None);
790 map.wait_for_range_ready(0, 2).unwrap();
791 assert_eq!(
792 map.check_range_ready_and_mark_pending(1, 2).unwrap(),
793 Some(vec![2])
794 );
795 map.set_range_ready_and_clear_pending(2, 1).unwrap();
796 map.set_range_ready_and_clear_pending(3, 7).unwrap();
797 assert!(map.is_range_ready(0, 1).unwrap());
798 assert!(map.is_range_ready(9, 1).unwrap());
799 assert!(map.is_range_all_ready());
800 }
801}